From 9e2eadc7e277bfc5f93c93630febb9be0b6cd680 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 9 Sep 2020 22:30:40 +0000 Subject: [PATCH] added new policy capabilities for recommendations API state store: call-out to generic update of job recommendations from job update method recommendations API work, and http endpoint errors for OSS support for scaling polices in task block of job spec add query filters for ScalingPolicy list endpoint command: nomad scaling policy list: added -job and -type --- acl/policy.go | 44 ++--- acl/policy_test.go | 1 + api/recommendations.go | 123 ++++++++++++++ api/scaling.go | 2 +- api/tasks.go | 1 + api/util_test.go | 26 +++ command/agent/http_oss.go | 5 + command/agent/job_endpoint.go | 13 +- command/agent/scaling_endpoint.go | 17 +- command/agent/scaling_endpoint_test.go | 88 ++++++---- command/scaling_policy_list.go | 26 ++- command/scaling_policy_list_test.go | 69 +++----- jobspec/parse_group.go | 57 ++++--- jobspec/parse_task.go | 60 +++++++ jobspec/parse_test.go | 87 +++++++++- .../task-scaling-policy-invalid-resource.hcl | 17 ++ .../task-scaling-policy-invalid-type.hcl | 18 ++ .../task-scaling-policy-missing-name.hcl | 17 ++ .../task-scaling-policy-multi-cpu.hcl | 27 +++ .../task-scaling-policy-multi-name.hcl | 17 ++ jobspec/test-fixtures/task-scaling-policy.hcl | 27 +++ .../tg-scaling-policy-invalid-type.hcl | 17 ++ .../tg-scaling-policy-with-label.hcl | 16 ++ jobspec2/hcl_conversions.go | 125 ++++++++++++++ jobspec2/parse_test.go | 160 ++++++++++++++++++ nomad/alloc_endpoint.go | 6 +- nomad/job_endpoint.go | 17 +- nomad/mock/mock.go | 4 +- nomad/scaling_endpoint.go | 113 ++++++++++--- nomad/state/state_store.go | 103 ++++++----- nomad/state/state_store_oss.go | 10 ++ nomad/state/state_store_test.go | 115 +++---------- nomad/structs/structs.go | 23 ++- nomad/structs/structs_oss.go | 4 + tools/go.sum | 2 - .../hashicorp/nomad/api/recommendations.go | 123 ++++++++++++++ .../github.com/hashicorp/nomad/api/scaling.go | 2 +- .../github.com/hashicorp/nomad/api/tasks.go | 1 + 38 files changed, 1282 insertions(+), 301 deletions(-) create mode 100644 api/recommendations.go create mode 100644 jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl create mode 100644 jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl create mode 100644 jobspec/test-fixtures/task-scaling-policy-missing-name.hcl create mode 100644 jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl create mode 100644 jobspec/test-fixtures/task-scaling-policy-multi-name.hcl create mode 100644 jobspec/test-fixtures/task-scaling-policy.hcl create mode 100644 jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl create mode 100644 jobspec/test-fixtures/tg-scaling-policy-with-label.hcl create mode 100644 vendor/github.com/hashicorp/nomad/api/recommendations.go diff --git a/acl/policy.go b/acl/policy.go index 3f92eac97..d1d5dcc4c 100644 --- a/acl/policy.go +++ b/acl/policy.go @@ -24,26 +24,27 @@ const ( // combined we take the union of all capabilities. If the deny capability is present, it // takes precedence and overwrites all other capabilities. - NamespaceCapabilityDeny = "deny" - NamespaceCapabilityListJobs = "list-jobs" - NamespaceCapabilityReadJob = "read-job" - NamespaceCapabilitySubmitJob = "submit-job" - NamespaceCapabilityDispatchJob = "dispatch-job" - NamespaceCapabilityReadLogs = "read-logs" - NamespaceCapabilityReadFS = "read-fs" - NamespaceCapabilityAllocExec = "alloc-exec" - NamespaceCapabilityAllocNodeExec = "alloc-node-exec" - NamespaceCapabilityAllocLifecycle = "alloc-lifecycle" - NamespaceCapabilitySentinelOverride = "sentinel-override" - NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin" - NamespaceCapabilityCSIWriteVolume = "csi-write-volume" - NamespaceCapabilityCSIReadVolume = "csi-read-volume" - NamespaceCapabilityCSIListVolume = "csi-list-volume" - NamespaceCapabilityCSIMountVolume = "csi-mount-volume" - NamespaceCapabilityListScalingPolicies = "list-scaling-policies" - NamespaceCapabilityReadScalingPolicy = "read-scaling-policy" - NamespaceCapabilityReadJobScaling = "read-job-scaling" - NamespaceCapabilityScaleJob = "scale-job" + NamespaceCapabilityDeny = "deny" + NamespaceCapabilityListJobs = "list-jobs" + NamespaceCapabilityReadJob = "read-job" + NamespaceCapabilitySubmitJob = "submit-job" + NamespaceCapabilityDispatchJob = "dispatch-job" + NamespaceCapabilityReadLogs = "read-logs" + NamespaceCapabilityReadFS = "read-fs" + NamespaceCapabilityAllocExec = "alloc-exec" + NamespaceCapabilityAllocNodeExec = "alloc-node-exec" + NamespaceCapabilityAllocLifecycle = "alloc-lifecycle" + NamespaceCapabilitySentinelOverride = "sentinel-override" + NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin" + NamespaceCapabilityCSIWriteVolume = "csi-write-volume" + NamespaceCapabilityCSIReadVolume = "csi-read-volume" + NamespaceCapabilityCSIListVolume = "csi-list-volume" + NamespaceCapabilityCSIMountVolume = "csi-mount-volume" + NamespaceCapabilityListScalingPolicies = "list-scaling-policies" + NamespaceCapabilityReadScalingPolicy = "read-scaling-policy" + NamespaceCapabilityReadJobScaling = "read-job-scaling" + NamespaceCapabilityScaleJob = "scale-job" + NamespaceCapabilitySubmitRecommendation = "submit-recommendation" ) var ( @@ -153,7 +154,7 @@ func isNamespaceCapabilityValid(cap string) bool { NamespaceCapabilityListScalingPolicies, NamespaceCapabilityReadScalingPolicy, NamespaceCapabilityReadJobScaling, NamespaceCapabilityScaleJob: return true // Separate the enterprise-only capabilities - case NamespaceCapabilitySentinelOverride: + case NamespaceCapabilitySentinelOverride, NamespaceCapabilitySubmitRecommendation: return true default: return false @@ -183,6 +184,7 @@ func expandNamespacePolicy(policy string) []string { NamespaceCapabilityAllocLifecycle, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIWriteVolume, + NamespaceCapabilitySubmitRecommendation, }...) switch policy { diff --git a/acl/policy_test.go b/acl/policy_test.go index ffb816b1f..60e4615ea 100644 --- a/acl/policy_test.go +++ b/acl/policy_test.go @@ -106,6 +106,7 @@ func TestParse(t *testing.T) { NamespaceCapabilityAllocLifecycle, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIWriteVolume, + NamespaceCapabilitySubmitRecommendation, }, }, { diff --git a/api/recommendations.go b/api/recommendations.go new file mode 100644 index 000000000..47414f24b --- /dev/null +++ b/api/recommendations.go @@ -0,0 +1,123 @@ +package api + +// Recommendations is used to query the recommendations endpoints. +type Recommendations struct { + client *Client +} + +// Recommendations returns a new handle on the recommendations endpoints. +func (c *Client) Recommendations() *Recommendations { + return &Recommendations{client: c} +} + +// List is used to dump all of the recommendations in the cluster +func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) { + var resp []*Recommendation + qm, err := r.client.query("/v1/recommendations", &resp, q) + if err != nil { + return nil, qm, err + } + return resp, qm, nil +} + +// Info is used to return information on a single recommendation +func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) { + var resp Recommendation + qm, err := r.client.query("/v1/recommendation/"+id, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Upsert is used to create or update a recommendation +func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) { + var resp Recommendation + wm, err := r.client.write("/v1/recommendation", rec, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + +// Delete is used to delete a list of recommendations +func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: []string{}, + Dismiss: ids, + } + wm, err := r.client.write("/v1/recommendations/apply", req, nil, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// Apply is used to apply a set of recommendations +func (r *Recommendations) Apply(ids []string, policyOverride bool) ( + *RecommendationApplyResponse, *WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: ids, + PolicyOverride: policyOverride, + } + var resp RecommendationApplyResponse + wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil) + if err != nil { + return nil, nil, err + } + resp.WriteMeta = *wm + return &resp, wm, nil +} + +// Recommendation is used to serialize a recommendation. +type Recommendation struct { + ID string + Region string + Namespace string + JobID string + JobVersion uint64 + Group string + Task string + Resource string + Value int + Current int + Meta map[string]interface{} + Stats map[string]float64 + EnforceVersion bool + + SubmitTime int64 + + CreateIndex uint64 + ModifyIndex uint64 +} + +// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations +type RecommendationApplyRequest struct { + Apply []string + Dismiss []string + PolicyOverride bool +} + +// RecommendationApplyResponse is used to apply a set of recommendations +type RecommendationApplyResponse struct { + UpdatedJobs []*SingleRecommendationApplyResult + Errors []*SingleRecommendationApplyError + WriteMeta +} + +type SingleRecommendationApplyResult struct { + Namespace string + JobID string + JobModifyIndex uint64 + EvalID string + EvalCreateIndex uint64 + Warnings string + Recommendations []string +} + +type SingleRecommendationApplyError struct { + Namespace string + JobID string + Recommendations []string + Error string +} diff --git a/api/scaling.go b/api/scaling.go index 56a0a964a..de889172d 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -66,12 +66,12 @@ type ScalingPolicy struct { Max *int64 `hcl:"max,optional"` Policy map[string]interface{} `hcl:"policy,block"` Enabled *bool `hcl:"enabled,optional"` + Type string `hcl:"type,optional"` /* fields set by server */ ID string Namespace string - Type string Target map[string]string CreateIndex uint64 ModifyIndex uint64 diff --git a/api/tasks.go b/api/tasks.go index b331ff6b0..2406a9bc3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -671,6 +671,7 @@ type Task struct { ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"` KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"` Kind string `hcl:"kind,optional"` + ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"` } func (t *Task) Canonicalize(tg *TaskGroup, job *Job) { diff --git a/api/util_test.go b/api/util_test.go index 758855750..5c0fd524f 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -68,6 +68,32 @@ func testPeriodicJob() *Job { return job } +func testRecommendation(job *Job) *Recommendation { + rec := &Recommendation{ + ID: "", + Region: *job.Region, + Namespace: *job.Namespace, + JobID: *job.ID, + Group: *job.TaskGroups[0].Name, + Task: job.TaskGroups[0].Tasks[0].Name, + Resource: "CPU", + Value: *job.TaskGroups[0].Tasks[0].Resources.CPU * 2, + Meta: map[string]interface{}{ + "testing": true, + "mocked": "also true", + }, + Stats: map[string]float64{ + "median": 50.0, + "mean": 51.0, + "max": 75.5, + "99": 73.0, + "min": 0.0, + }, + EnforceVersion: false, + } + return rec +} + func testNamespace() *Namespace { return &Namespace{ Name: "test-namespace", diff --git a/command/agent/http_oss.go b/command/agent/http_oss.go index 49056510b..e90818141 100644 --- a/command/agent/http_oss.go +++ b/command/agent/http_oss.go @@ -17,6 +17,11 @@ func (s *HTTPServer) registerEnterpriseHandlers() { s.mux.HandleFunc("/v1/quota", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/operator/license", s.wrap(s.entOnly)) + + s.mux.HandleFunc("/v1/recommendation", s.wrap(s.entOnly)) + s.mux.HandleFunc("/v1/recommendations", s.wrap(s.entOnly)) + s.mux.HandleFunc("/v1/recommendations/apply", s.wrap(s.entOnly)) + s.mux.HandleFunc("/v1/recommendation/", s.wrap(s.entOnly)) } func (s *HTTPServer) entOnly(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 23e05f637..c4e6d01fd 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -982,7 +982,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta tg.Tasks = make([]*structs.Task, l) for l, task := range taskGroup.Tasks { t := &structs.Task{} - ApiTaskToStructsTask(task, t) + ApiTaskToStructsTask(job, tg, task, t) // Set the tasks vault namespace from Job if it was not // specified by the task or group @@ -996,7 +996,9 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta // ApiTaskToStructsTask is a copy and type conversion between the API // representation of a task from a struct representation of a task. -func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { +func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, + apiTask *api.Task, structsTask *structs.Task) { + structsTask.Name = apiTask.Name structsTask.Driver = apiTask.Driver structsTask.User = apiTask.User @@ -1033,6 +1035,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { } } + if l := len(apiTask.ScalingPolicies); l != 0 { + structsTask.ScalingPolicies = make([]*structs.ScalingPolicy, l) + for i, policy := range apiTask.ScalingPolicies { + structsTask.ScalingPolicies[i] = ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask) + } + } + if l := len(apiTask.Services); l != 0 { structsTask.Services = make([]*structs.Service, l) for i, service := range apiTask.Services { diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 3781ff6ab..49a68f6ce 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -22,6 +22,12 @@ func (s *HTTPServer) scalingPoliciesListRequest(resp http.ResponseWriter, req *h if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } + if job := req.URL.Query().Get("job"); job != "" { + args.Job = job + } + if tpe := req.URL.Query().Get("type"); tpe != "" { + args.Type = tpe + } var out structs.ScalingPolicyListResponse if err := s.agent.RPC("Scaling.ListPolicies", &args, &out); err != nil { @@ -77,9 +83,14 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { p := structs.ScalingPolicy{ - Enabled: *ap.Enabled, - Policy: ap.Policy, - Target: map[string]string{}, + Type: ap.Type, + Policy: ap.Policy, + Target: map[string]string{}, + } + if ap.Enabled != nil { + p.Enabled = *ap.Enabled + } else { + p.Enabled = true } if ap.Max != nil { p.Max = *ap.Max diff --git a/command/agent/scaling_endpoint_test.go b/command/agent/scaling_endpoint_test.go index 10c3c4186..0abb21c0d 100644 --- a/command/agent/scaling_endpoint_test.go +++ b/command/agent/scaling_endpoint_test.go @@ -12,6 +12,7 @@ import ( ) func TestHTTP_ScalingPoliciesList(t *testing.T) { + require := require.New(t) t.Parallel() httpTest(t, nil, func(s *TestAgent) { for i := 0; i < 3; i++ { @@ -26,40 +27,75 @@ func TestHTTP_ScalingPoliciesList(t *testing.T) { }, } var resp structs.JobRegisterResponse - if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(s.Agent.RPC("Job.Register", &args, &resp)) } // Make the HTTP request req, err := http.NewRequest("GET", "/v1/scaling/policies", nil) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) + respW := httptest.NewRecorder() // Make the request obj, err := s.Server.ScalingPoliciesRequest(respW, req) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) // Check for the index - if respW.Header().Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - if respW.Header().Get("X-Nomad-KnownLeader") != "true" { - t.Fatalf("missing known leader") - } - if respW.Header().Get("X-Nomad-LastContact") == "" { - t.Fatalf("missing last contact") - } + require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index") + require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader") + require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact") // Check the list l := obj.([]*structs.ScalingPolicyListStub) - if len(l) != 3 { - t.Fatalf("bad: %#v", l) + require.Len(l, 3) + }) +} + +func TestHTTP_ScalingPoliciesList_Filter(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + var job *structs.Job + for i := 0; i < 3; i++ { + // Create the job + job, _ = mock.JobWithScalingPolicy() + + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var resp structs.JobRegisterResponse + require.NoError(s.Agent.RPC("Job.Register", &args, &resp)) } + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/scaling/policies?job="+job.ID, nil) + require.NoError(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.ScalingPoliciesRequest(respW, req) + require.NoError(err) + + // Check the list + l := obj.([]*structs.ScalingPolicyListStub) + require.Len(l, 1) + + // Request again, with policy type filter + req, err = http.NewRequest("GET", "/v1/scaling/policies?type=cluster", nil) + require.NoError(err) + respW = httptest.NewRecorder() + + // Make the request + obj, err = s.Server.ScalingPoliciesRequest(respW, req) + require.NoError(err) + + // Check the list + l = obj.([]*structs.ScalingPolicyListStub) + require.Len(l, 0) }) } @@ -90,15 +126,9 @@ func TestHTTP_ScalingPolicyGet(t *testing.T) { require.NoError(err) // Check for the index - if respW.Header().Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - if respW.Header().Get("X-Nomad-KnownLeader") != "true" { - t.Fatalf("missing known leader") - } - if respW.Header().Get("X-Nomad-LastContact") == "" { - t.Fatalf("missing last contact") - } + require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index") + require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader") + require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact") // Check the policy require.Equal(p.ID, obj.(*structs.ScalingPolicy).ID) diff --git a/command/scaling_policy_list.go b/command/scaling_policy_list.go index 0dd09cd25..1a15a8c51 100644 --- a/command/scaling_policy_list.go +++ b/command/scaling_policy_list.go @@ -31,6 +31,9 @@ General Options: Policy Info Options: + -type + Filter scaling policies by type. + -json Output the scaling policy in its JSON format. @@ -48,6 +51,8 @@ func (s *ScalingPolicyListCommand) Synopsis() string { func (s *ScalingPolicyListCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ + "-job": complete.PredictNothing, + "-type": complete.PredictNothing, "-json": complete.PredictNothing, "-t": complete.PredictAnything, }) @@ -59,12 +64,14 @@ func (s *ScalingPolicyListCommand) Name() string { return "scaling policy list" // Run satisfies the cli.Command Run function. func (s *ScalingPolicyListCommand) Run(args []string) int { var json bool - var tmpl string + var tmpl, policyType, job string flags := s.Meta.FlagSet(s.Name(), FlagSetClient) flags.Usage = func() { s.Ui.Output(s.Help()) } flags.BoolVar(&json, "json", false, "") flags.StringVar(&tmpl, "t", "", "") + flags.StringVar(&policyType, "type", "", "") + flags.StringVar(&job, "job", "", "") if err := flags.Parse(args); err != nil { return 1 } @@ -81,7 +88,16 @@ func (s *ScalingPolicyListCommand) Run(args []string) int { return 1 } - policies, _, err := client.Scaling().ListPolicies(nil) + q := &api.QueryOptions{ + Params: map[string]string{}, + } + if policyType != "" { + q.Params["type"] = policyType + } + if job != "" { + q.Params["job"] = job + } + policies, _, err := client.Scaling().ListPolicies(q) if err != nil { s.Ui.Error(fmt.Sprintf("Error listing scaling policies: %s", err)) return 1 @@ -103,7 +119,7 @@ func (s *ScalingPolicyListCommand) Run(args []string) int { } // Create the output table header. - output := []string{"ID|Enabled|Target"} + output := []string{"ID|Enabled|Type|Target"} // Sort the list of policies based on their target. sortedPolicies := scalingPolicyStubList{policies: policies} @@ -112,8 +128,8 @@ func (s *ScalingPolicyListCommand) Run(args []string) int { // Iterate the policies and add to the output. for _, policy := range sortedPolicies.policies { output = append(output, fmt.Sprintf( - "%s|%v|%s", - policy.ID, policy.Enabled, formatScalingPolicyTarget(policy.Target))) + "%s|%v|%s|%s", + policy.ID, policy.Enabled, policy.Type, formatScalingPolicyTarget(policy.Target))) } // Output. diff --git a/command/scaling_policy_list_test.go b/command/scaling_policy_list_test.go index 2c4475908..860646559 100644 --- a/command/scaling_policy_list_test.go +++ b/command/scaling_policy_list_test.go @@ -1,52 +1,36 @@ package command import ( - "fmt" - "strings" "testing" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/testutil" - "github.com/mitchellh/cli" ) func TestScalingPolicyListCommand_Run(t *testing.T) { + require := require.New(t) t.Parallel() - srv, client, url := testServer(t, true, nil) + srv, client, url := testServer(t, false, nil) defer srv.Shutdown() - testutil.WaitForResult(func() (bool, error) { - nodes, _, err := client.Nodes().List(nil) - if err != nil { - return false, err - } - if len(nodes) == 0 { - return false, fmt.Errorf("missing node") - } - if _, ok := nodes[0].Drivers["mock_driver"]; !ok { - return false, fmt.Errorf("mock_driver not ready") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) ui := cli.NewMockUi() cmd := &ScalingPolicyListCommand{Meta: Meta{Ui: ui}} // Perform an initial list, which should return zero results. - if code := cmd.Run([]string{"-address=" + url}); code != 0 { - t.Fatalf("expected cmd run exit code 0, got: %d", code) - } - if out := ui.OutputWriter.String(); !strings.Contains(out, "No policies found") { - t.Fatalf("expected no policies found within output: %v", out) - } + code := cmd.Run([]string{"-address=" + url}) + require.Equal(0, code) + out := ui.OutputWriter.String() + require.Contains(out, "No policies found") // Generate two test jobs. jobs := []*api.Job{testJob("scaling_policy_list_1"), testJob("scaling_policy_list_2")} // Generate an example scaling policy. scalingPolicy := api.ScalingPolicy{ + Type: api.ScalingPolicyTypeHorizontal, Enabled: helper.BoolToPtr(true), Min: helper.Int64ToPtr(1), Max: helper.Int64ToPtr(1), @@ -55,29 +39,18 @@ func TestScalingPolicyListCommand_Run(t *testing.T) { // Iterate the jobs, add the scaling policy and register. for _, job := range jobs { job.TaskGroups[0].Scaling = &scalingPolicy - resp, _, err := client.Jobs().Register(job, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { - t.Fatalf("expected waitForSuccess exit code 0, got: %d", code) - } + _, _, err := client.Jobs().Register(job, nil) + require.NoError(err) } // Perform a new list which should yield results.. - if code := cmd.Run([]string{"-address=" + url}); code != 0 { - t.Fatalf("expected cmd run exit code 0, got: %d", code) - } - out := ui.OutputWriter.String() - if !strings.Contains(out, "ID") || - !strings.Contains(out, "Enabled") || - !strings.Contains(out, "Target") { - t.Fatalf("expected table headers within output: %v", out) - } - if !strings.Contains(out, "scaling_policy_list_1") { - t.Fatalf("expected job scaling_policy_list_1 within output: %v", out) - } - if !strings.Contains(out, "scaling_policy_list_2") { - t.Fatalf("expected job scaling_policy_list_2 within output: %v", out) - } + code = cmd.Run([]string{"-address=" + url}) + require.Equal(0, code) + out = ui.OutputWriter.String() + require.Contains(out, "ID") + require.Contains(out, "Enabled") + require.Contains(out, "Type") + require.Contains(out, "Target") + require.Contains(out, "scaling_policy_list_1") + require.Contains(out, "scaling_policy_list_2") } diff --git a/jobspec/parse_group.go b/jobspec/parse_group.go index 2514d639e..8977afd51 100644 --- a/jobspec/parse_group.go +++ b/jobspec/parse_group.go @@ -185,7 +185,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { // Parse scaling policy if o := listVal.Filter("scaling"); len(o.Items) > 0 { - if err := parseScalingPolicy(&g.Scaling, o); err != nil { + if err := parseGroupScalingPolicy(&g.Scaling, o); err != nil { return multierror.Prefix(err, "scaling ->") } } @@ -319,21 +319,39 @@ func parseVolumes(out *map[string]*api.VolumeRequest, list *ast.ObjectList) erro return nil } -func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { - list = list.Elem() +func parseGroupScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { if len(list.Items) > 1 { return fmt.Errorf("only one 'scaling' block allowed") } + item := list.Items[0] + if len(item.Keys) != 0 { + return fmt.Errorf("task group scaling policy should not have a name") + } + p, err := parseScalingPolicy(item) + if err != nil { + return err + } - // Get our resource object - o := list.Items[0] + // group-specific validation + if p.Max == nil { + return fmt.Errorf("missing 'max'") + } + if p.Type == "" { + p.Type = "horizontal" + } else if p.Type != "horizontal" { + return fmt.Errorf("task group scaling policy had invalid type: %q", p.Type) + } + *out = p + return nil +} +func parseScalingPolicy(item *ast.ObjectItem) (*api.ScalingPolicy, error) { // We need this later var listVal *ast.ObjectList - if ot, ok := o.Val.(*ast.ObjectType); ok { + if ot, ok := item.Val.(*ast.ObjectType); ok { listVal = ot.List } else { - return fmt.Errorf("should be an object") + return nil, fmt.Errorf("should be an object") } valid := []string{ @@ -341,14 +359,15 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { "max", "policy", "enabled", + "type", } - if err := checkHCLKeys(o.Val, valid); err != nil { - return err + if err := checkHCLKeys(item.Val, valid); err != nil { + return nil, err } var m map[string]interface{} - if err := hcl.DecodeObject(&m, o.Val); err != nil { - return err + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return nil, err } delete(m, "policy") @@ -358,30 +377,26 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { Result: &result, }) if err != nil { - return err + return nil, err } if err := dec.Decode(m); err != nil { - return err - } - if result.Max == nil { - return fmt.Errorf("missing 'max'") + return nil, err } // If we have policy, then parse that if o := listVal.Filter("policy"); len(o.Items) > 0 { if len(o.Elem().Items) > 1 { - return fmt.Errorf("only one 'policy' block allowed per 'scaling' block") + return nil, fmt.Errorf("only one 'policy' block allowed per 'scaling' block") } p := o.Elem().Items[0] var m map[string]interface{} if err := hcl.DecodeObject(&m, p.Val); err != nil { - return err + return nil, err } if err := mapstructure.WeakDecode(m, &result.Policy); err != nil { - return err + return nil, err } } - *out = &result - return nil + return &result, nil } diff --git a/jobspec/parse_task.go b/jobspec/parse_task.go index 07d026145..d43198bf2 100644 --- a/jobspec/parse_task.go +++ b/jobspec/parse_task.go @@ -2,6 +2,7 @@ package jobspec import ( "fmt" + "strings" "time" multierror "github.com/hashicorp/go-multierror" @@ -23,6 +24,7 @@ var ( "kill_timeout", "shutdown_delay", "kill_signal", + "scaling", } normalTaskKeys = append(commonTaskKeys, @@ -110,6 +112,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) { delete(m, "vault") delete(m, "volume_mount") delete(m, "csi_plugin") + delete(m, "scaling") // Build the task var t api.Task @@ -276,6 +279,13 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) { } } + // Parse scaling policies + if o := listVal.Filter("scaling"); len(o.Items) > 0 { + if err := parseTaskScalingPolicies(&t.ScalingPolicies, o); err != nil { + return nil, err + } + } + // If we have a vault block, then parse that if o := listVal.Filter("vault"); len(o.Items) > 0 { v := &api.Vault{ @@ -462,6 +472,56 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error { return nil } +func parseTaskScalingPolicies(result *[]*api.ScalingPolicy, list *ast.ObjectList) error { + if len(list.Items) == 0 { + return nil + } + + errPrefix := "scaling ->" + // Go through each object and turn it into an actual result. + seen := make(map[string]bool) + for _, item := range list.Items { + if l := len(item.Keys); l == 0 { + return multierror.Prefix(fmt.Errorf("task scaling policy missing name"), errPrefix) + } else if l > 1 { + return multierror.Prefix(fmt.Errorf("task scaling policy should only have one name"), errPrefix) + } + n := item.Keys[0].Token.Value().(string) + errPrefix = fmt.Sprintf("scaling[%v] ->", n) + + var policyType string + switch strings.ToLower(n) { + case "cpu": + policyType = "vertical_cpu" + case "mem": + policyType = "vertical_mem" + default: + return multierror.Prefix(fmt.Errorf(`scaling policy name must be "cpu" or "mem"`), errPrefix) + } + + // Make sure we haven't already found this + if seen[n] { + return multierror.Prefix(fmt.Errorf("scaling policy cannot be defined more than once"), errPrefix) + } + seen[n] = true + + p, err := parseScalingPolicy(item) + if err != nil { + return multierror.Prefix(err, errPrefix) + } + + if p.Type == "" { + p.Type = policyType + } else if p.Type != policyType { + return multierror.Prefix(fmt.Errorf("policy had invalid 'type': %q", p.Type), errPrefix) + } + + *result = append(*result, p) + } + + return nil +} + func parseResources(result *api.Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) == 0 { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index bb36383f4..b0f397f1b 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -1153,7 +1153,6 @@ func TestParse(t *testing.T) { }, false, }, - { "tg-service-check.hcl", &api.Job{ @@ -1383,7 +1382,6 @@ func TestParse(t *testing.T) { }, false, }, - { "tg-scaling-policy.hcl", &api.Job{ @@ -1393,8 +1391,9 @@ func TestParse(t *testing.T) { { Name: stringToPtr("group"), Scaling: &api.ScalingPolicy{ - Min: int64ToPtr(5), - Max: int64ToPtr(100), + Type: "horizontal", + Min: int64ToPtr(5), + Max: int64ToPtr(100), Policy: map[string]interface{}{ "foo": "bar", "b": true, @@ -1414,6 +1413,47 @@ func TestParse(t *testing.T) { }, false, }, + { + "task-scaling-policy.hcl", + &api.Job{ + ID: stringToPtr("foo"), + Name: stringToPtr("foo"), + TaskGroups: []*api.TaskGroup{ + { + Name: stringToPtr("bar"), + Tasks: []*api.Task{ + { + Name: "bar", + Driver: "docker", + ScalingPolicies: []*api.ScalingPolicy{ + { + Type: "vertical_cpu", + Target: nil, + Min: int64ToPtr(50), + Max: int64ToPtr(1000), + Policy: map[string]interface{}{ + "test": "cpu", + }, + Enabled: boolToPtr(true), + }, + { + Type: "vertical_mem", + Target: nil, + Min: int64ToPtr(128), + Max: int64ToPtr(1024), + Policy: map[string]interface{}{ + "test": "mem", + }, + Enabled: boolToPtr(false), + }, + }, + }, + }, + }, + }, + }, + false, + }, { "tg-service-connect-gateway-ingress.hcl", &api.Job{ @@ -1480,6 +1520,7 @@ func TestParse(t *testing.T) { { Name: stringToPtr("group"), Scaling: &api.ScalingPolicy{ + Type: "horizontal", Min: nil, Max: int64ToPtr(10), Policy: nil, @@ -1490,19 +1531,51 @@ func TestParse(t *testing.T) { }, false, }, - { "tg-scaling-policy-missing-max.hcl", nil, true, }, - { "tg-scaling-policy-multi-policy.hcl", nil, true, }, - + { + "tg-scaling-policy-with-label.hcl", + nil, + true, + }, + { + "tg-scaling-policy-invalid-type.hcl", + nil, + true, + }, + { + "task-scaling-policy-missing-name.hcl", + nil, + true, + }, + { + "task-scaling-policy-multi-name.hcl", + nil, + true, + }, + { + "task-scaling-policy-multi-cpu.hcl", + nil, + true, + }, + { + "task-scaling-policy-invalid-type.hcl", + nil, + true, + }, + { + "task-scaling-policy-invalid-resource.hcl", + nil, + true, + }, { "multiregion.hcl", &api.Job{ diff --git a/jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl b/jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl new file mode 100644 index 000000000..4f2bd9647 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl @@ -0,0 +1,17 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "wrong" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl b/jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl new file mode 100644 index 000000000..e8313ab61 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl @@ -0,0 +1,18 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" { + type = "vertical_mem" + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-missing-name.hcl b/jobspec/test-fixtures/task-scaling-policy-missing-name.hcl new file mode 100644 index 000000000..03c31f77d --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-missing-name.hcl @@ -0,0 +1,17 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl b/jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl new file mode 100644 index 000000000..dc8ead75b --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl @@ -0,0 +1,27 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + scaling "cpu" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-multi-name.hcl b/jobspec/test-fixtures/task-scaling-policy-multi-name.hcl new file mode 100644 index 000000000..fd3fbc993 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-multi-name.hcl @@ -0,0 +1,17 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" "mem" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy.hcl b/jobspec/test-fixtures/task-scaling-policy.hcl new file mode 100644 index 000000000..408dd7ca6 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy.hcl @@ -0,0 +1,27 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + scaling "mem" { + enabled = false + min = 128 + max = 1024 + + policy { + test = "mem" + } + } + + } +} + diff --git a/jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl b/jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl new file mode 100644 index 000000000..b5d605b4a --- /dev/null +++ b/jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl @@ -0,0 +1,17 @@ +job "elastic" { + group "group" { + scaling { + type = "vertical_cpu" + enabled = false + min = 5 + max = 100 + + policy { + foo = "bar" + b = true + val = 5 + f = 0.1 + } + } + } +} diff --git a/jobspec/test-fixtures/tg-scaling-policy-with-label.hcl b/jobspec/test-fixtures/tg-scaling-policy-with-label.hcl new file mode 100644 index 000000000..8f25ef090 --- /dev/null +++ b/jobspec/test-fixtures/tg-scaling-policy-with-label.hcl @@ -0,0 +1,16 @@ +job "elastic" { + group "group" { + scaling "no-label-allowed" { + enabled = false + min = 5 + max = 100 + + policy { + foo = "bar" + b = true + val = 5 + f = 0.1 + } + } + } +} diff --git a/jobspec2/hcl_conversions.go b/jobspec2/hcl_conversions.go index 413a9e148..928981b3d 100644 --- a/jobspec2/hcl_conversions.go +++ b/jobspec2/hcl_conversions.go @@ -3,6 +3,7 @@ package jobspec2 import ( "fmt" "reflect" + "strings" "time" "github.com/hashicorp/hcl/v2" @@ -18,6 +19,7 @@ var hclDecoder *gohcl.Decoder func init() { hclDecoder = newHCLDecoder() hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.TaskGroup{}), decodeTaskGroup) + hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask) } func newHCLDecoder() *gohcl.Decoder { @@ -266,6 +268,7 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D } d := newHCLDecoder() + d.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask) diags = d.DecodeBody(tgBody, ctx, tg) if tgExtra.Vault != nil { @@ -276,6 +279,128 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D } } + if tg.Scaling != nil { + if tg.Scaling.Type == "" { + tg.Scaling.Type = "horizontal" + } + diags = append(diags, validateGroupScalingPolicy(tg.Scaling, tgBody)...) + } return diags } + +func decodeTask(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { + // special case scaling policy + t := val.(*api.Task) + b, remain, diags := body.PartialContent(&hcl.BodySchema{ + Blocks: []hcl.BlockHeaderSchema{ + {Type: "scaling", LabelNames: []string{"name"}}, + }, + }) + + diags = append(diags, decodeTaskScalingPolicies(b.Blocks, ctx, t)...) + + decoder := newHCLDecoder() + diags = append(diags, decoder.DecodeBody(remain, ctx, val)...) + + return diags +} + +func decodeTaskScalingPolicies(blocks hcl.Blocks, ctx *hcl.EvalContext, task *api.Task) hcl.Diagnostics { + if len(blocks) == 0 { + return nil + } + + var diags hcl.Diagnostics + seen := map[string]*hcl.Block{} + for _, b := range blocks { + label := strings.ToLower(b.Labels[0]) + var policyType string + switch label { + case "cpu": + policyType = "vertical_cpu" + case "mem": + policyType = "vertical_mem" + default: + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Invalid scaling policy name", + Detail: `scaling policy name must be "cpu" or "mem"`, + Subject: &b.LabelRanges[0], + }) + continue + } + + if prev, ok := seen[label]; ok { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: fmt.Sprintf("Duplicate scaling %q block", label), + Detail: fmt.Sprintf( + "Only one scaling %s block is allowed. Another was defined at %s.", + label, prev.DefRange.String(), + ), + Subject: &b.DefRange, + }) + continue + } + seen[label] = b + + var p api.ScalingPolicy + diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, &p)...) + + if p.Type == "" { + p.Type = policyType + } else if p.Type != policyType { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Invalid scaling policy type", + Detail: fmt.Sprintf( + "Invalid policy type, expected %q but found %q", + p.Type, policyType), + Subject: &b.DefRange, + }) + continue + } + + task.ScalingPolicies = append(task.ScalingPolicies, &p) + } + + return diags +} + +func validateGroupScalingPolicy(p *api.ScalingPolicy, body hcl.Body) hcl.Diagnostics { + // fast path: do nothing + if p.Max != nil && p.Type == "horizontal" { + return nil + } + + content, _, diags := body.PartialContent(&hcl.BodySchema{ + Blocks: []hcl.BlockHeaderSchema{{Type: "scaling"}}, + }) + + if len(content.Blocks) == 0 { + // unexpected, given that we have a scaling policy + return diags + } + + pc, _, diags := content.Blocks[0].Body.PartialContent(&hcl.BodySchema{ + Attributes: []hcl.AttributeSchema{ + {Name: "max", Required: true}, + {Name: "type", Required: false}, + }, + }) + + if p.Type != "horizontal" { + if attr, ok := pc.Attributes["type"]; ok { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Invalid group scaling type", + Detail: fmt.Sprintf( + "task group scaling policy had invalid type: %q", + p.Type), + Subject: attr.Expr.Range().Ptr(), + }) + } + } + return diags +} diff --git a/jobspec2/parse_test.go b/jobspec2/parse_test.go index 875f3f634..d774aa616 100644 --- a/jobspec2/parse_test.go +++ b/jobspec2/parse_test.go @@ -145,3 +145,163 @@ dynamic "group" { require.Equal(t, "groupB", *out.TaskGroups[1].Name) require.Equal(t, "groupC", *out.TaskGroups[2].Name) } + +func TestParse_InvalidScalingSyntax(t *testing.T) { + cases := []struct { + name string + expectedErr string + hcl string + }{ + { + "valid", + "", + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "group missing max", + `argument "max" is required`, + ` +job "example" { + group "g1" { + scaling { + #max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "group invalid type", + `task group scaling policy had invalid type`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "invalid_type" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "task invalid label", + `scaling policy name must be "cpu" or "mem"`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "not_cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "task duplicate blocks", + `Duplicate scaling "cpu" block`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "cpu" { + max = 15 + } + } + } +} +`, + }, + { + "task invalid type", + `Invalid scaling policy type`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + type = "invalid" + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, err := ParseWithArgs(c.name+".hcl", strings.NewReader(c.hcl), nil, true) + if c.expectedErr == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr) + } + }) + } +} diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 7d518c568..0d6d7210c 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -37,6 +37,10 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes return structs.ErrPermissionDenied } + allow := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) + } + // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, @@ -48,7 +52,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes prefix := args.QueryOptions.Prefix if args.RequestNamespace() == structs.AllNamespacesSentinel { - allowedNSes, err := allowedNSes(aclObj, state) + allowedNSes, err := allowedNSes(aclObj, state, allow) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index ed16809d1..379c759d4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -451,16 +451,16 @@ func propagateScalingPolicyIDs(old, new *structs.Job) error { oldIDs := make(map[string]string) if old != nil { - // jobs currently only have scaling policies on task groups, so we can - // find correspondences using task group names + // use the job-scoped key (includes type, group, and task) to uniquely + // identify policies in a job for _, p := range old.GetScalingPolicies() { - oldIDs[p.Target[structs.ScalingTargetGroup]] = p.ID + oldIDs[p.JobKey()] = p.ID } } // ignore any existing ID in the policy, they should be empty for _, p := range new.GetScalingPolicies() { - if id, ok := oldIDs[p.Target[structs.ScalingTargetGroup]]; ok { + if id, ok := oldIDs[p.JobKey()]; ok { p.ID = id } else { p.ID = uuid.Generate() @@ -1265,7 +1265,7 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest, // allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to. // Returns `nil` set if the token has access to all namespaces // and ErrPermissionDenied if the token has no capabilities on any namespace. -func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) { +func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) bool) (map[string]bool, error) { if aclObj == nil || aclObj.IsManagement() { return nil, nil } @@ -1279,7 +1279,7 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, err r := make(map[string]bool, len(nses)) for _, ns := range nses { - if aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) { + if allow(ns) { r[ns] = true } } @@ -1367,6 +1367,9 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job return err } prefix := args.QueryOptions.Prefix + allow := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) + } // Setup the blocking query opts := blockingOptions{ @@ -1374,7 +1377,7 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // check if user has permission to all namespaces - allowedNSes, err := allowedNSes(aclObj, state) + allowedNSes, err := allowedNSes(aclObj, state, allow) if err == structs.ErrPermissionDenied { // return empty jobs if token isn't authorized for any // namespace, matching other endpoints diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 6e412b165..e8c555a10 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1380,9 +1380,7 @@ func ScalingPolicy() *structs.ScalingPolicy { Policy: map[string]interface{}{ "a": "b", }, - Enabled: true, - CreateIndex: 10, - ModifyIndex: 20, + Enabled: true, } } diff --git a/nomad/scaling_endpoint.go b/nomad/scaling_endpoint.go index 89e1aa0a6..98da2ac53 100644 --- a/nomad/scaling_endpoint.go +++ b/nomad/scaling_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "strings" "time" metrics "github.com/armon/go-metrics" @@ -8,6 +9,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -19,16 +21,18 @@ type Scaling struct { } // ListPolicies is used to list the policies -func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, - reply *structs.ScalingPolicyListResponse) error { +func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error { - if done, err := a.srv.forward("Scaling.ListPolicies", args, args, reply); done { + if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now()) - // Check for list-job permissions - if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + if args.RequestNamespace() == structs.AllNamespacesSentinel { + return p.listAllNamespaces(args, reply) + } + + if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies) @@ -45,22 +49,24 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // Iterate over all the policies - iter, err := state.ScalingPoliciesByNamespace(ws, args.Namespace) + var err error + var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix) + } else if job := args.Job; job != "" { + iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job) + } else { + iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type) + } + if err != nil { return err } // Convert all the policies to a list stub reply.Policies = nil - for { - raw := iter.Next() - if raw == nil { - break - } + for raw := iter.Next(); raw != nil; raw = iter.Next() { policy := raw.(*structs.ScalingPolicy) - // if _, ok := policies[policy.Target]; ok || mgt { - // reply.Policies = append(reply.Policies, policy.Stub()) - // } reply.Policies = append(reply.Policies, policy.Stub()) } @@ -70,28 +76,27 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, return err } - // Ensure we never set the index to zero, otherwise a blocking query cannot be used. - // We floor the index at one, since realistically the first write must have a higher index. + // Don't return index zero, otherwise a blocking query cannot be used. if index == 0 { index = 1 } reply.Index = index return nil }} - return a.srv.blockingRPC(&opts) + return p.srv.blockingRPC(&opts) } // GetPolicy is used to get a specific policy -func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, +func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, reply *structs.SingleScalingPolicyResponse) error { - if done, err := a.srv.forward("Scaling.GetPolicy", args, args, reply); done { + if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now()) // Check for list-job permissions - if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy) @@ -129,5 +134,71 @@ func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, reply.Index = index return nil }} - return a.srv.blockingRPC(&opts) + return p.srv.blockingRPC(&opts) +} + +func (j *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error { + // Check for list-job permissions + aclObj, err := j.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + prefix := args.QueryOptions.Prefix + allow := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) || + (aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)) + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + // check if user has permission to all namespaces + allowedNSes, err := allowedNSes(aclObj, state, allow) + if err == structs.ErrPermissionDenied { + // return empty if token isn't authorized for any namespace + reply.Policies = []*structs.ScalingPolicyListStub{} + return nil + } else if err != nil { + return err + } + + // Capture all the policies + var iter memdb.ResultIterator + if args.Type != "" { + iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type) + } else { + iter, err = state.ScalingPolicies(ws) + } + if err != nil { + return err + } + + var policies []*structs.ScalingPolicyListStub + for raw := iter.Next(); raw != nil; raw = iter.Next() { + policy := raw.(*structs.ScalingPolicy) + if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] { + // not permitted to this name namespace + continue + } + if prefix != "" && !strings.HasPrefix(policy.ID, prefix) { + continue + } + policies = append(policies, policy.Stub()) + } + reply.Policies = policies + + // Use the last index that affected the policies table or summary + index, err := state.Index("scaling_policy") + if err != nil { + return err + } + reply.Index = helper.Uint64Max(1, index) + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3fbc59fb1..d5e5f12f9 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sort" + "strings" "time" log "github.com/hashicorp/go-hclog" @@ -1527,6 +1528,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b return fmt.Errorf("unable to update job scaling policies: %v", err) } + if err := s.updateJobRecommendations(index, txn, existingJob, job); err != nil { + return fmt.Errorf("unable to update job recommendations: %v", err) + } + if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { return fmt.Errorf("unable to update job scaling policies: %v", err) } @@ -1644,6 +1649,11 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return fmt.Errorf("deleting job scaling policies failed: %v", err) } + // Delete any job recommendations + if err := s.deleteRecommendationsByJob(index, txn, job); err != nil { + return fmt.Errorf("deleting job recommendatons failed: %v", err) + } + // Delete the scaling events if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil { return fmt.Errorf("deleting job scaling events failed: %v", err) @@ -4567,17 +4577,10 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx ws := memdb.NewWatchSet() - if job.Stop { - if err := s.deleteJobScalingPolicies(index, job, txn); err != nil { - return fmt.Errorf("deleting job scaling policies failed: %v", err) - } - return nil - } - scalingPolicies := job.GetScalingPolicies() - newTargets := map[string]struct{}{} + newTargets := map[string]bool{} for _, p := range scalingPolicies { - newTargets[p.Target[structs.ScalingTargetGroup]] = struct{}{} + newTargets[p.JobKey()] = true } // find existing policies that need to be deleted deletedPolicies := []string{} @@ -4585,13 +4588,9 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx if err != nil { return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err) } - for { - raw := iter.Next() - if raw == nil { - break - } + for raw := iter.Next(); raw != nil; raw = iter.Next() { oldPolicy := raw.(*structs.ScalingPolicy) - if _, ok := newTargets[oldPolicy.Target[structs.ScalingTargetGroup]]; !ok { + if !newTargets[oldPolicy.JobKey()] { deletedPolicies = append(deletedPolicies, oldPolicy.ID) } } @@ -5460,12 +5459,11 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s } policy.ID = existing.ID policy.CreateIndex = existing.CreateIndex - policy.ModifyIndex = index } else { // policy.ID must have been set already in Job.Register before log apply policy.CreateIndex = index - policy.ModifyIndex = index } + policy.ModifyIndex = index // Insert the scaling policy hadUpdates = true @@ -5474,7 +5472,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s } } - // Update the indexes table for scaling policy + // Update the indexes table for scaling policy if we updated any policies if hadUpdates { if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { return fmt.Errorf("index update failed: %v", err) @@ -5740,19 +5738,6 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e return iter, nil } -// ScalingPoliciesByType returns an iterator over scaling policies of a certain type. -func (s *StateStore) ScalingPoliciesByType(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) { - txn := s.db.ReadTxn() - - iter, err := txn.Get("scaling_policy", "type", t) - if err != nil { - return nil, err - } - - ws.Add(iter.WatchCh()) - return iter, nil -} - // ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix. func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() @@ -5766,7 +5751,7 @@ func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (m return iter, nil } -func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, typ string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() iter, err := txn.Get("scaling_policy", "target_prefix", namespace) @@ -5775,18 +5760,22 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } ws.Add(iter.WatchCh()) - filter := func(raw interface{}) bool { - d, ok := raw.(*structs.ScalingPolicy) - if !ok { - return true - } - return d.Target[structs.ScalingTargetNamespace] != namespace + // Wrap the iterator in a filter to exact match the namespace + iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace)) + + // If policy type is specified as well, wrap again + if typ != "" { + iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool { + p, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + return !strings.HasPrefix(p.Type, typ) + }) } - // Wrap the iterator in a filter - wrap := memdb.NewFilterIterator(iter, filter) - return wrap, nil + return iter, nil } func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { @@ -5834,6 +5823,8 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S return nil, nil } +// ScalingPolicyByTargetAndType returns a fully-qualified policy against a target and policy type, +// or nil if it does not exist. This method does not honor the watchset on the policy type, just the target. func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy, error) { txn := s.db.ReadTxn() @@ -5847,6 +5838,7 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ if err != nil { return nil, fmt.Errorf("scaling_policy lookup failed: %v", err) } + ws.Add(it.WatchCh()) // Check for type @@ -5866,6 +5858,34 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ return nil, nil } +func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace string, prefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get("scaling_policy", "id_prefix", prefix) + if err != nil { + return nil, fmt.Errorf("scaling policy lookup failed: %v", err) + } + + ws.Add(iter.WatchCh()) + + iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace)) + + return iter, nil +} + +// scalingPolicyNamespaceFilter returns a filter function that filters all +// scaling policies not targeting the given namespace. +func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool { + return func(raw interface{}) bool { + p, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + + return p.Target[structs.ScalingTargetNamespace] != namespace + } +} + func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() @@ -6216,6 +6236,7 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { return nil } +// ScalingEventsRestore is used to restore scaling events for a job func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { if err := r.txn.Insert("scaling_event", jobEvents); err != nil { return fmt.Errorf("scaling event insert failed: %v", err) diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index c161caf81..e8f91195e 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -20,3 +20,13 @@ func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota s func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } + +// deleteRecommendationsByJob deletes all recommendations for the specified job +func (s *StateStore) deleteRecommendationsByJob(index uint64, txn Txn, job *structs.Job) error { + return nil +} + +// updateJobRecommendations updates/deletes job recommendations as necessary for a job update +func (s *StateStore) updateJobRecommendations(index uint64, txn Txn, prevJob, newJob *structs.Job) error { + return nil +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 3045da7a5..8483f9ace 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8632,12 +8632,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { policy2.Target[structs.ScalingTargetNamespace] = otherNamespace ws1 := memdb.NewWatchSet() - iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace) + iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace, "") require.NoError(err) require.Nil(iter.Next()) ws2 := memdb.NewWatchSet() - iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace) + iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace, "") require.NoError(err) require.Nil(iter.Next()) @@ -8646,7 +8646,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { require.True(watchFired(ws1)) require.True(watchFired(ws2)) - iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace) + iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace, "") require.NoError(err) policiesInDefaultNamespace := []string{} for { @@ -8658,7 +8658,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { } require.ElementsMatch([]string{policy.ID}, policiesInDefaultNamespace) - iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace) + iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace, "") require.NoError(err) policiesInOtherNamespace := []string{} for { @@ -8684,12 +8684,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { policy2.Target[structs.ScalingTargetNamespace] = ns2 ws1 := memdb.NewWatchSet() - iter, err := state.ScalingPoliciesByNamespace(ws1, ns1) + iter, err := state.ScalingPoliciesByNamespace(ws1, ns1, "") require.NoError(err) require.Nil(iter.Next()) ws2 := memdb.NewWatchSet() - iter, err = state.ScalingPoliciesByNamespace(ws2, ns2) + iter, err = state.ScalingPoliciesByNamespace(ws2, ns2, "") require.NoError(err) require.Nil(iter.Next()) @@ -8698,7 +8698,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { require.True(watchFired(ws1)) require.True(watchFired(ws2)) - iter, err = state.ScalingPoliciesByNamespace(nil, ns1) + iter, err = state.ScalingPoliciesByNamespace(nil, ns1, "") require.NoError(err) policiesInNS1 := []string{} for { @@ -8710,7 +8710,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { } require.ElementsMatch([]string{policy1.ID}, policiesInNS1) - iter, err = state.ScalingPoliciesByNamespace(nil, ns2) + iter, err = state.ScalingPoliciesByNamespace(nil, ns2, "") require.NoError(err) policiesInNS2 := []string{} for { @@ -8723,37 +8723,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { require.ElementsMatch([]string{policy2.ID}, policiesInNS2) } -func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { - t.Parallel() - - require := require.New(t) - - state := testStateStore(t) - job, policy := mock.JobWithScalingPolicy() - - // Create a watchset so we can test that upsert fires the watch - ws := memdb.NewWatchSet() - out, err := state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type) - require.NoError(err) - require.Nil(out) - - var newIndex uint64 = 1000 - err = state.UpsertJob(structs.MsgTypeTestSetup, newIndex, job) - require.NoError(err) - require.True(watchFired(ws), "watch did not fire") - - ws = memdb.NewWatchSet() - out, err = state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type) - require.NoError(err) - require.NotNil(out) - require.Equal(newIndex, out.CreateIndex) - require.Equal(newIndex, out.ModifyIndex) - - index, err := state.Index("scaling_policy") - require.NoError(err) - require.Equal(newIndex, index) -} - // Scaling Policy IDs are generated randomly during Job.Register // Subsequent updates of the job should preserve the ID for the scaling policy // associated with a given target. @@ -8883,7 +8852,7 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) { require.Nil(out) // Ensure we see both policies - iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace]) + iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace], "") require.NoError(err) count := 0 for { @@ -8967,13 +8936,12 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) { // upsert a stopped job, verify that we don't fire the watcher or add any scaling policies err = state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) require.NoError(err) - require.False(watchFired(ws)) - // stopped job should have no scaling policies, watcher doesn't fire + require.True(watchFired(ws)) list, err = state.ScalingPolicies(ws) require.NoError(err) - require.Nil(list.Next()) + require.NotNil(list.Next()) - // Establish a new watcher + // Establish a new watchset ws = memdb.NewWatchSet() _, err = state.ScalingPolicies(ws) require.NoError(err) @@ -8982,14 +8950,14 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) { err = state.UpsertJob(structs.MsgTypeTestSetup, 1100, job) require.NoError(err) - // Ensure the scaling policy was added, watch was fired, index was advanced - require.True(watchFired(ws)) + // Ensure the scaling policy still exists, watch was not fired, index was not advanced out, err := state.ScalingPolicyByTargetAndType(nil, policy.Target, policy.Type) require.NoError(err) require.NotNil(out) index, err := state.Index("scaling_policy") require.NoError(err) - require.GreaterOrEqual(index, uint64(1100)) + require.EqualValues(index, 1000) + require.False(watchFired(ws)) } func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) { @@ -9105,15 +9073,12 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) { pOther2.Type = "other-type-2" // Create search routine - search := func(t string) (count int, found []string, err error) { + search := func(t string) (found []string) { found = []string{} - iter, err := state.ScalingPoliciesByType(nil, t) - if err != nil { - return - } + iter, err := state.ScalingPoliciesByTypePrefix(nil, t) + require.NoError(err) for raw := iter.Next(); raw != nil; raw = iter.Next() { - count++ found = append(found, raw.(*structs.ScalingPolicy).Type) } return @@ -9126,47 +9091,23 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) { // Check if we can read horizontal policies expect := []string{pHorzA.Type, pHorzB.Type} - count, found, err := search(structs.ScalingPolicyTypeHorizontal) - - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(2, count) + actual := search(structs.ScalingPolicyTypeHorizontal) + require.ElementsMatch(expect, actual) // Check if we can read policies of other types expect = []string{pOther1.Type} - count, found, err = search("other-type-1") + actual = search("other-type-1") + require.ElementsMatch(expect, actual) - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(1, count) - - // Check if we can't read policies by prefix - expect = []string{} - count, found, err = search("other-type") - - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(0, count) + // Check that we can read policies by prefix + expect = []string{"other-type-1", "other-type-2"} + actual = search("other-type") + require.Equal(expect, actual) // Check for empty result expect = []string{} - count, found, err = search("non-existing") - - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(0, count) + actual = search("non-existing") + require.ElementsMatch(expect, actual) } func TestStateStore_ScalingPoliciesByTypePrefix(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 501b4a4bc..481b157ba 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1181,6 +1181,8 @@ type SingleScalingPolicyResponse struct { // ScalingPolicyListRequest is used to parameterize a scaling policy list request type ScalingPolicyListRequest struct { + Job string + Type string QueryOptions } @@ -5273,6 +5275,14 @@ type ScalingPolicy struct { ModifyIndex uint64 } +// JobKey returns a key that is unique to a job-scoped target, useful as a map +// key. This uses the policy type, plus target (group and task). +func (p *ScalingPolicy) JobKey() string { + return p.Type + "\000" + + p.Target[ScalingTargetGroup] + "\000" + + p.Target[ScalingTargetTask] +} + const ( ScalingTargetNamespace = "Namespace" ScalingTargetJob = "Job" @@ -5376,6 +5386,7 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { return !reflect.DeepEqual(*p, copy) } +// TarketTaskGroup updates a ScalingPolicy target to specify a given task group func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { p.Target = map[string]string{ ScalingTargetNamespace: job.Namespace, @@ -5385,6 +5396,13 @@ func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy return p } +// TargetTask updates a ScalingPolicy target to specify a given task +func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy { + p.TargetTaskGroup(job, tg) + p.Target[ScalingTargetTask] = task.Name + return p +} + func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { stub := &ScalingPolicyListStub{ ID: p.ID, @@ -5410,6 +5428,8 @@ func (j *Job) GetScalingPolicies() []*ScalingPolicy { } } + ret = append(ret, j.GetEntScalingPolicies()...) + return ret } @@ -6541,7 +6561,8 @@ type Task struct { // attached to this task. VolumeMounts []*VolumeMount - // The kill signal to use for the task. This is an optional specification, + // ScalingPolicies is a list of scaling policies scoped to this task + ScalingPolicies []*ScalingPolicy // KillSignal is the kill signal to use for the task. This is an optional // specification and defaults to SIGINT diff --git a/nomad/structs/structs_oss.go b/nomad/structs/structs_oss.go index 20f71adec..ec540a82c 100644 --- a/nomad/structs/structs_oss.go +++ b/nomad/structs/structs_oss.go @@ -31,3 +31,7 @@ func (p *ScalingPolicy) validateType() multierror.Error { return mErr } + +func (j *Job) GetEntScalingPolicies() []*ScalingPolicy { + return nil +} diff --git a/tools/go.sum b/tools/go.sum index 8b9149ccf..427f55f98 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -5,8 +5,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8 h1:CkFIJJAKEbZbM2tKmCqt/v9ivgpikjPu5lnDsk8huLE= -github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e h1:8YO27VG7yrHhATTepO2FYLbGUB1wfYbkqoKiVwaAQ+Q= github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8= diff --git a/vendor/github.com/hashicorp/nomad/api/recommendations.go b/vendor/github.com/hashicorp/nomad/api/recommendations.go new file mode 100644 index 000000000..47414f24b --- /dev/null +++ b/vendor/github.com/hashicorp/nomad/api/recommendations.go @@ -0,0 +1,123 @@ +package api + +// Recommendations is used to query the recommendations endpoints. +type Recommendations struct { + client *Client +} + +// Recommendations returns a new handle on the recommendations endpoints. +func (c *Client) Recommendations() *Recommendations { + return &Recommendations{client: c} +} + +// List is used to dump all of the recommendations in the cluster +func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) { + var resp []*Recommendation + qm, err := r.client.query("/v1/recommendations", &resp, q) + if err != nil { + return nil, qm, err + } + return resp, qm, nil +} + +// Info is used to return information on a single recommendation +func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) { + var resp Recommendation + qm, err := r.client.query("/v1/recommendation/"+id, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Upsert is used to create or update a recommendation +func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) { + var resp Recommendation + wm, err := r.client.write("/v1/recommendation", rec, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + +// Delete is used to delete a list of recommendations +func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: []string{}, + Dismiss: ids, + } + wm, err := r.client.write("/v1/recommendations/apply", req, nil, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// Apply is used to apply a set of recommendations +func (r *Recommendations) Apply(ids []string, policyOverride bool) ( + *RecommendationApplyResponse, *WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: ids, + PolicyOverride: policyOverride, + } + var resp RecommendationApplyResponse + wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil) + if err != nil { + return nil, nil, err + } + resp.WriteMeta = *wm + return &resp, wm, nil +} + +// Recommendation is used to serialize a recommendation. +type Recommendation struct { + ID string + Region string + Namespace string + JobID string + JobVersion uint64 + Group string + Task string + Resource string + Value int + Current int + Meta map[string]interface{} + Stats map[string]float64 + EnforceVersion bool + + SubmitTime int64 + + CreateIndex uint64 + ModifyIndex uint64 +} + +// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations +type RecommendationApplyRequest struct { + Apply []string + Dismiss []string + PolicyOverride bool +} + +// RecommendationApplyResponse is used to apply a set of recommendations +type RecommendationApplyResponse struct { + UpdatedJobs []*SingleRecommendationApplyResult + Errors []*SingleRecommendationApplyError + WriteMeta +} + +type SingleRecommendationApplyResult struct { + Namespace string + JobID string + JobModifyIndex uint64 + EvalID string + EvalCreateIndex uint64 + Warnings string + Recommendations []string +} + +type SingleRecommendationApplyError struct { + Namespace string + JobID string + Recommendations []string + Error string +} diff --git a/vendor/github.com/hashicorp/nomad/api/scaling.go b/vendor/github.com/hashicorp/nomad/api/scaling.go index 56a0a964a..de889172d 100644 --- a/vendor/github.com/hashicorp/nomad/api/scaling.go +++ b/vendor/github.com/hashicorp/nomad/api/scaling.go @@ -66,12 +66,12 @@ type ScalingPolicy struct { Max *int64 `hcl:"max,optional"` Policy map[string]interface{} `hcl:"policy,block"` Enabled *bool `hcl:"enabled,optional"` + Type string `hcl:"type,optional"` /* fields set by server */ ID string Namespace string - Type string Target map[string]string CreateIndex uint64 ModifyIndex uint64 diff --git a/vendor/github.com/hashicorp/nomad/api/tasks.go b/vendor/github.com/hashicorp/nomad/api/tasks.go index b331ff6b0..2406a9bc3 100644 --- a/vendor/github.com/hashicorp/nomad/api/tasks.go +++ b/vendor/github.com/hashicorp/nomad/api/tasks.go @@ -671,6 +671,7 @@ type Task struct { ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"` KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"` Kind string `hcl:"kind,optional"` + ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"` } func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {