diff --git a/acl/policy.go b/acl/policy.go index 3f92eac97..d1d5dcc4c 100644 --- a/acl/policy.go +++ b/acl/policy.go @@ -24,26 +24,27 @@ const ( // combined we take the union of all capabilities. If the deny capability is present, it // takes precedence and overwrites all other capabilities. - NamespaceCapabilityDeny = "deny" - NamespaceCapabilityListJobs = "list-jobs" - NamespaceCapabilityReadJob = "read-job" - NamespaceCapabilitySubmitJob = "submit-job" - NamespaceCapabilityDispatchJob = "dispatch-job" - NamespaceCapabilityReadLogs = "read-logs" - NamespaceCapabilityReadFS = "read-fs" - NamespaceCapabilityAllocExec = "alloc-exec" - NamespaceCapabilityAllocNodeExec = "alloc-node-exec" - NamespaceCapabilityAllocLifecycle = "alloc-lifecycle" - NamespaceCapabilitySentinelOverride = "sentinel-override" - NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin" - NamespaceCapabilityCSIWriteVolume = "csi-write-volume" - NamespaceCapabilityCSIReadVolume = "csi-read-volume" - NamespaceCapabilityCSIListVolume = "csi-list-volume" - NamespaceCapabilityCSIMountVolume = "csi-mount-volume" - NamespaceCapabilityListScalingPolicies = "list-scaling-policies" - NamespaceCapabilityReadScalingPolicy = "read-scaling-policy" - NamespaceCapabilityReadJobScaling = "read-job-scaling" - NamespaceCapabilityScaleJob = "scale-job" + NamespaceCapabilityDeny = "deny" + NamespaceCapabilityListJobs = "list-jobs" + NamespaceCapabilityReadJob = "read-job" + NamespaceCapabilitySubmitJob = "submit-job" + NamespaceCapabilityDispatchJob = "dispatch-job" + NamespaceCapabilityReadLogs = "read-logs" + NamespaceCapabilityReadFS = "read-fs" + NamespaceCapabilityAllocExec = "alloc-exec" + NamespaceCapabilityAllocNodeExec = "alloc-node-exec" + NamespaceCapabilityAllocLifecycle = "alloc-lifecycle" + NamespaceCapabilitySentinelOverride = "sentinel-override" + NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin" + NamespaceCapabilityCSIWriteVolume = "csi-write-volume" + NamespaceCapabilityCSIReadVolume = "csi-read-volume" + NamespaceCapabilityCSIListVolume = "csi-list-volume" + NamespaceCapabilityCSIMountVolume = "csi-mount-volume" + NamespaceCapabilityListScalingPolicies = "list-scaling-policies" + NamespaceCapabilityReadScalingPolicy = "read-scaling-policy" + NamespaceCapabilityReadJobScaling = "read-job-scaling" + NamespaceCapabilityScaleJob = "scale-job" + NamespaceCapabilitySubmitRecommendation = "submit-recommendation" ) var ( @@ -153,7 +154,7 @@ func isNamespaceCapabilityValid(cap string) bool { NamespaceCapabilityListScalingPolicies, NamespaceCapabilityReadScalingPolicy, NamespaceCapabilityReadJobScaling, NamespaceCapabilityScaleJob: return true // Separate the enterprise-only capabilities - case NamespaceCapabilitySentinelOverride: + case NamespaceCapabilitySentinelOverride, NamespaceCapabilitySubmitRecommendation: return true default: return false @@ -183,6 +184,7 @@ func expandNamespacePolicy(policy string) []string { NamespaceCapabilityAllocLifecycle, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIWriteVolume, + NamespaceCapabilitySubmitRecommendation, }...) switch policy { diff --git a/acl/policy_test.go b/acl/policy_test.go index ffb816b1f..60e4615ea 100644 --- a/acl/policy_test.go +++ b/acl/policy_test.go @@ -106,6 +106,7 @@ func TestParse(t *testing.T) { NamespaceCapabilityAllocLifecycle, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIWriteVolume, + NamespaceCapabilitySubmitRecommendation, }, }, { diff --git a/api/recommendations.go b/api/recommendations.go new file mode 100644 index 000000000..47414f24b --- /dev/null +++ b/api/recommendations.go @@ -0,0 +1,123 @@ +package api + +// Recommendations is used to query the recommendations endpoints. +type Recommendations struct { + client *Client +} + +// Recommendations returns a new handle on the recommendations endpoints. +func (c *Client) Recommendations() *Recommendations { + return &Recommendations{client: c} +} + +// List is used to dump all of the recommendations in the cluster +func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) { + var resp []*Recommendation + qm, err := r.client.query("/v1/recommendations", &resp, q) + if err != nil { + return nil, qm, err + } + return resp, qm, nil +} + +// Info is used to return information on a single recommendation +func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) { + var resp Recommendation + qm, err := r.client.query("/v1/recommendation/"+id, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Upsert is used to create or update a recommendation +func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) { + var resp Recommendation + wm, err := r.client.write("/v1/recommendation", rec, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + +// Delete is used to delete a list of recommendations +func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: []string{}, + Dismiss: ids, + } + wm, err := r.client.write("/v1/recommendations/apply", req, nil, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// Apply is used to apply a set of recommendations +func (r *Recommendations) Apply(ids []string, policyOverride bool) ( + *RecommendationApplyResponse, *WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: ids, + PolicyOverride: policyOverride, + } + var resp RecommendationApplyResponse + wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil) + if err != nil { + return nil, nil, err + } + resp.WriteMeta = *wm + return &resp, wm, nil +} + +// Recommendation is used to serialize a recommendation. +type Recommendation struct { + ID string + Region string + Namespace string + JobID string + JobVersion uint64 + Group string + Task string + Resource string + Value int + Current int + Meta map[string]interface{} + Stats map[string]float64 + EnforceVersion bool + + SubmitTime int64 + + CreateIndex uint64 + ModifyIndex uint64 +} + +// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations +type RecommendationApplyRequest struct { + Apply []string + Dismiss []string + PolicyOverride bool +} + +// RecommendationApplyResponse is used to apply a set of recommendations +type RecommendationApplyResponse struct { + UpdatedJobs []*SingleRecommendationApplyResult + Errors []*SingleRecommendationApplyError + WriteMeta +} + +type SingleRecommendationApplyResult struct { + Namespace string + JobID string + JobModifyIndex uint64 + EvalID string + EvalCreateIndex uint64 + Warnings string + Recommendations []string +} + +type SingleRecommendationApplyError struct { + Namespace string + JobID string + Recommendations []string + Error string +} diff --git a/api/scaling.go b/api/scaling.go index 56a0a964a..de889172d 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -66,12 +66,12 @@ type ScalingPolicy struct { Max *int64 `hcl:"max,optional"` Policy map[string]interface{} `hcl:"policy,block"` Enabled *bool `hcl:"enabled,optional"` + Type string `hcl:"type,optional"` /* fields set by server */ ID string Namespace string - Type string Target map[string]string CreateIndex uint64 ModifyIndex uint64 diff --git a/api/tasks.go b/api/tasks.go index b331ff6b0..2406a9bc3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -671,6 +671,7 @@ type Task struct { ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"` KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"` Kind string `hcl:"kind,optional"` + ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"` } func (t *Task) Canonicalize(tg *TaskGroup, job *Job) { diff --git a/api/util_test.go b/api/util_test.go index 758855750..5c0fd524f 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -68,6 +68,32 @@ func testPeriodicJob() *Job { return job } +func testRecommendation(job *Job) *Recommendation { + rec := &Recommendation{ + ID: "", + Region: *job.Region, + Namespace: *job.Namespace, + JobID: *job.ID, + Group: *job.TaskGroups[0].Name, + Task: job.TaskGroups[0].Tasks[0].Name, + Resource: "CPU", + Value: *job.TaskGroups[0].Tasks[0].Resources.CPU * 2, + Meta: map[string]interface{}{ + "testing": true, + "mocked": "also true", + }, + Stats: map[string]float64{ + "median": 50.0, + "mean": 51.0, + "max": 75.5, + "99": 73.0, + "min": 0.0, + }, + EnforceVersion: false, + } + return rec +} + func testNamespace() *Namespace { return &Namespace{ Name: "test-namespace", diff --git a/command/agent/http_oss.go b/command/agent/http_oss.go index 49056510b..e90818141 100644 --- a/command/agent/http_oss.go +++ b/command/agent/http_oss.go @@ -17,6 +17,11 @@ func (s *HTTPServer) registerEnterpriseHandlers() { s.mux.HandleFunc("/v1/quota", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/operator/license", s.wrap(s.entOnly)) + + s.mux.HandleFunc("/v1/recommendation", s.wrap(s.entOnly)) + s.mux.HandleFunc("/v1/recommendations", s.wrap(s.entOnly)) + s.mux.HandleFunc("/v1/recommendations/apply", s.wrap(s.entOnly)) + s.mux.HandleFunc("/v1/recommendation/", s.wrap(s.entOnly)) } func (s *HTTPServer) entOnly(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 23e05f637..c4e6d01fd 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -982,7 +982,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta tg.Tasks = make([]*structs.Task, l) for l, task := range taskGroup.Tasks { t := &structs.Task{} - ApiTaskToStructsTask(task, t) + ApiTaskToStructsTask(job, tg, task, t) // Set the tasks vault namespace from Job if it was not // specified by the task or group @@ -996,7 +996,9 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta // ApiTaskToStructsTask is a copy and type conversion between the API // representation of a task from a struct representation of a task. -func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { +func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, + apiTask *api.Task, structsTask *structs.Task) { + structsTask.Name = apiTask.Name structsTask.Driver = apiTask.Driver structsTask.User = apiTask.User @@ -1033,6 +1035,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { } } + if l := len(apiTask.ScalingPolicies); l != 0 { + structsTask.ScalingPolicies = make([]*structs.ScalingPolicy, l) + for i, policy := range apiTask.ScalingPolicies { + structsTask.ScalingPolicies[i] = ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask) + } + } + if l := len(apiTask.Services); l != 0 { structsTask.Services = make([]*structs.Service, l) for i, service := range apiTask.Services { diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 3781ff6ab..49a68f6ce 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -22,6 +22,12 @@ func (s *HTTPServer) scalingPoliciesListRequest(resp http.ResponseWriter, req *h if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } + if job := req.URL.Query().Get("job"); job != "" { + args.Job = job + } + if tpe := req.URL.Query().Get("type"); tpe != "" { + args.Type = tpe + } var out structs.ScalingPolicyListResponse if err := s.agent.RPC("Scaling.ListPolicies", &args, &out); err != nil { @@ -77,9 +83,14 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { p := structs.ScalingPolicy{ - Enabled: *ap.Enabled, - Policy: ap.Policy, - Target: map[string]string{}, + Type: ap.Type, + Policy: ap.Policy, + Target: map[string]string{}, + } + if ap.Enabled != nil { + p.Enabled = *ap.Enabled + } else { + p.Enabled = true } if ap.Max != nil { p.Max = *ap.Max diff --git a/command/agent/scaling_endpoint_test.go b/command/agent/scaling_endpoint_test.go index 10c3c4186..0abb21c0d 100644 --- a/command/agent/scaling_endpoint_test.go +++ b/command/agent/scaling_endpoint_test.go @@ -12,6 +12,7 @@ import ( ) func TestHTTP_ScalingPoliciesList(t *testing.T) { + require := require.New(t) t.Parallel() httpTest(t, nil, func(s *TestAgent) { for i := 0; i < 3; i++ { @@ -26,40 +27,75 @@ func TestHTTP_ScalingPoliciesList(t *testing.T) { }, } var resp structs.JobRegisterResponse - if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(s.Agent.RPC("Job.Register", &args, &resp)) } // Make the HTTP request req, err := http.NewRequest("GET", "/v1/scaling/policies", nil) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) + respW := httptest.NewRecorder() // Make the request obj, err := s.Server.ScalingPoliciesRequest(respW, req) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) // Check for the index - if respW.Header().Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - if respW.Header().Get("X-Nomad-KnownLeader") != "true" { - t.Fatalf("missing known leader") - } - if respW.Header().Get("X-Nomad-LastContact") == "" { - t.Fatalf("missing last contact") - } + require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index") + require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader") + require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact") // Check the list l := obj.([]*structs.ScalingPolicyListStub) - if len(l) != 3 { - t.Fatalf("bad: %#v", l) + require.Len(l, 3) + }) +} + +func TestHTTP_ScalingPoliciesList_Filter(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + var job *structs.Job + for i := 0; i < 3; i++ { + // Create the job + job, _ = mock.JobWithScalingPolicy() + + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var resp structs.JobRegisterResponse + require.NoError(s.Agent.RPC("Job.Register", &args, &resp)) } + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/scaling/policies?job="+job.ID, nil) + require.NoError(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.ScalingPoliciesRequest(respW, req) + require.NoError(err) + + // Check the list + l := obj.([]*structs.ScalingPolicyListStub) + require.Len(l, 1) + + // Request again, with policy type filter + req, err = http.NewRequest("GET", "/v1/scaling/policies?type=cluster", nil) + require.NoError(err) + respW = httptest.NewRecorder() + + // Make the request + obj, err = s.Server.ScalingPoliciesRequest(respW, req) + require.NoError(err) + + // Check the list + l = obj.([]*structs.ScalingPolicyListStub) + require.Len(l, 0) }) } @@ -90,15 +126,9 @@ func TestHTTP_ScalingPolicyGet(t *testing.T) { require.NoError(err) // Check for the index - if respW.Header().Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - if respW.Header().Get("X-Nomad-KnownLeader") != "true" { - t.Fatalf("missing known leader") - } - if respW.Header().Get("X-Nomad-LastContact") == "" { - t.Fatalf("missing last contact") - } + require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index") + require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader") + require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact") // Check the policy require.Equal(p.ID, obj.(*structs.ScalingPolicy).ID) diff --git a/command/scaling_policy_list.go b/command/scaling_policy_list.go index 0dd09cd25..1a15a8c51 100644 --- a/command/scaling_policy_list.go +++ b/command/scaling_policy_list.go @@ -31,6 +31,9 @@ General Options: Policy Info Options: + -type + Filter scaling policies by type. + -json Output the scaling policy in its JSON format. @@ -48,6 +51,8 @@ func (s *ScalingPolicyListCommand) Synopsis() string { func (s *ScalingPolicyListCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ + "-job": complete.PredictNothing, + "-type": complete.PredictNothing, "-json": complete.PredictNothing, "-t": complete.PredictAnything, }) @@ -59,12 +64,14 @@ func (s *ScalingPolicyListCommand) Name() string { return "scaling policy list" // Run satisfies the cli.Command Run function. func (s *ScalingPolicyListCommand) Run(args []string) int { var json bool - var tmpl string + var tmpl, policyType, job string flags := s.Meta.FlagSet(s.Name(), FlagSetClient) flags.Usage = func() { s.Ui.Output(s.Help()) } flags.BoolVar(&json, "json", false, "") flags.StringVar(&tmpl, "t", "", "") + flags.StringVar(&policyType, "type", "", "") + flags.StringVar(&job, "job", "", "") if err := flags.Parse(args); err != nil { return 1 } @@ -81,7 +88,16 @@ func (s *ScalingPolicyListCommand) Run(args []string) int { return 1 } - policies, _, err := client.Scaling().ListPolicies(nil) + q := &api.QueryOptions{ + Params: map[string]string{}, + } + if policyType != "" { + q.Params["type"] = policyType + } + if job != "" { + q.Params["job"] = job + } + policies, _, err := client.Scaling().ListPolicies(q) if err != nil { s.Ui.Error(fmt.Sprintf("Error listing scaling policies: %s", err)) return 1 @@ -103,7 +119,7 @@ func (s *ScalingPolicyListCommand) Run(args []string) int { } // Create the output table header. - output := []string{"ID|Enabled|Target"} + output := []string{"ID|Enabled|Type|Target"} // Sort the list of policies based on their target. sortedPolicies := scalingPolicyStubList{policies: policies} @@ -112,8 +128,8 @@ func (s *ScalingPolicyListCommand) Run(args []string) int { // Iterate the policies and add to the output. for _, policy := range sortedPolicies.policies { output = append(output, fmt.Sprintf( - "%s|%v|%s", - policy.ID, policy.Enabled, formatScalingPolicyTarget(policy.Target))) + "%s|%v|%s|%s", + policy.ID, policy.Enabled, policy.Type, formatScalingPolicyTarget(policy.Target))) } // Output. diff --git a/command/scaling_policy_list_test.go b/command/scaling_policy_list_test.go index 2c4475908..860646559 100644 --- a/command/scaling_policy_list_test.go +++ b/command/scaling_policy_list_test.go @@ -1,52 +1,36 @@ package command import ( - "fmt" - "strings" "testing" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/testutil" - "github.com/mitchellh/cli" ) func TestScalingPolicyListCommand_Run(t *testing.T) { + require := require.New(t) t.Parallel() - srv, client, url := testServer(t, true, nil) + srv, client, url := testServer(t, false, nil) defer srv.Shutdown() - testutil.WaitForResult(func() (bool, error) { - nodes, _, err := client.Nodes().List(nil) - if err != nil { - return false, err - } - if len(nodes) == 0 { - return false, fmt.Errorf("missing node") - } - if _, ok := nodes[0].Drivers["mock_driver"]; !ok { - return false, fmt.Errorf("mock_driver not ready") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) ui := cli.NewMockUi() cmd := &ScalingPolicyListCommand{Meta: Meta{Ui: ui}} // Perform an initial list, which should return zero results. - if code := cmd.Run([]string{"-address=" + url}); code != 0 { - t.Fatalf("expected cmd run exit code 0, got: %d", code) - } - if out := ui.OutputWriter.String(); !strings.Contains(out, "No policies found") { - t.Fatalf("expected no policies found within output: %v", out) - } + code := cmd.Run([]string{"-address=" + url}) + require.Equal(0, code) + out := ui.OutputWriter.String() + require.Contains(out, "No policies found") // Generate two test jobs. jobs := []*api.Job{testJob("scaling_policy_list_1"), testJob("scaling_policy_list_2")} // Generate an example scaling policy. scalingPolicy := api.ScalingPolicy{ + Type: api.ScalingPolicyTypeHorizontal, Enabled: helper.BoolToPtr(true), Min: helper.Int64ToPtr(1), Max: helper.Int64ToPtr(1), @@ -55,29 +39,18 @@ func TestScalingPolicyListCommand_Run(t *testing.T) { // Iterate the jobs, add the scaling policy and register. for _, job := range jobs { job.TaskGroups[0].Scaling = &scalingPolicy - resp, _, err := client.Jobs().Register(job, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { - t.Fatalf("expected waitForSuccess exit code 0, got: %d", code) - } + _, _, err := client.Jobs().Register(job, nil) + require.NoError(err) } // Perform a new list which should yield results.. - if code := cmd.Run([]string{"-address=" + url}); code != 0 { - t.Fatalf("expected cmd run exit code 0, got: %d", code) - } - out := ui.OutputWriter.String() - if !strings.Contains(out, "ID") || - !strings.Contains(out, "Enabled") || - !strings.Contains(out, "Target") { - t.Fatalf("expected table headers within output: %v", out) - } - if !strings.Contains(out, "scaling_policy_list_1") { - t.Fatalf("expected job scaling_policy_list_1 within output: %v", out) - } - if !strings.Contains(out, "scaling_policy_list_2") { - t.Fatalf("expected job scaling_policy_list_2 within output: %v", out) - } + code = cmd.Run([]string{"-address=" + url}) + require.Equal(0, code) + out = ui.OutputWriter.String() + require.Contains(out, "ID") + require.Contains(out, "Enabled") + require.Contains(out, "Type") + require.Contains(out, "Target") + require.Contains(out, "scaling_policy_list_1") + require.Contains(out, "scaling_policy_list_2") } diff --git a/jobspec/parse_group.go b/jobspec/parse_group.go index 2514d639e..8977afd51 100644 --- a/jobspec/parse_group.go +++ b/jobspec/parse_group.go @@ -185,7 +185,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { // Parse scaling policy if o := listVal.Filter("scaling"); len(o.Items) > 0 { - if err := parseScalingPolicy(&g.Scaling, o); err != nil { + if err := parseGroupScalingPolicy(&g.Scaling, o); err != nil { return multierror.Prefix(err, "scaling ->") } } @@ -319,21 +319,39 @@ func parseVolumes(out *map[string]*api.VolumeRequest, list *ast.ObjectList) erro return nil } -func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { - list = list.Elem() +func parseGroupScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { if len(list.Items) > 1 { return fmt.Errorf("only one 'scaling' block allowed") } + item := list.Items[0] + if len(item.Keys) != 0 { + return fmt.Errorf("task group scaling policy should not have a name") + } + p, err := parseScalingPolicy(item) + if err != nil { + return err + } - // Get our resource object - o := list.Items[0] + // group-specific validation + if p.Max == nil { + return fmt.Errorf("missing 'max'") + } + if p.Type == "" { + p.Type = "horizontal" + } else if p.Type != "horizontal" { + return fmt.Errorf("task group scaling policy had invalid type: %q", p.Type) + } + *out = p + return nil +} +func parseScalingPolicy(item *ast.ObjectItem) (*api.ScalingPolicy, error) { // We need this later var listVal *ast.ObjectList - if ot, ok := o.Val.(*ast.ObjectType); ok { + if ot, ok := item.Val.(*ast.ObjectType); ok { listVal = ot.List } else { - return fmt.Errorf("should be an object") + return nil, fmt.Errorf("should be an object") } valid := []string{ @@ -341,14 +359,15 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { "max", "policy", "enabled", + "type", } - if err := checkHCLKeys(o.Val, valid); err != nil { - return err + if err := checkHCLKeys(item.Val, valid); err != nil { + return nil, err } var m map[string]interface{} - if err := hcl.DecodeObject(&m, o.Val); err != nil { - return err + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return nil, err } delete(m, "policy") @@ -358,30 +377,26 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { Result: &result, }) if err != nil { - return err + return nil, err } if err := dec.Decode(m); err != nil { - return err - } - if result.Max == nil { - return fmt.Errorf("missing 'max'") + return nil, err } // If we have policy, then parse that if o := listVal.Filter("policy"); len(o.Items) > 0 { if len(o.Elem().Items) > 1 { - return fmt.Errorf("only one 'policy' block allowed per 'scaling' block") + return nil, fmt.Errorf("only one 'policy' block allowed per 'scaling' block") } p := o.Elem().Items[0] var m map[string]interface{} if err := hcl.DecodeObject(&m, p.Val); err != nil { - return err + return nil, err } if err := mapstructure.WeakDecode(m, &result.Policy); err != nil { - return err + return nil, err } } - *out = &result - return nil + return &result, nil } diff --git a/jobspec/parse_task.go b/jobspec/parse_task.go index 07d026145..d43198bf2 100644 --- a/jobspec/parse_task.go +++ b/jobspec/parse_task.go @@ -2,6 +2,7 @@ package jobspec import ( "fmt" + "strings" "time" multierror "github.com/hashicorp/go-multierror" @@ -23,6 +24,7 @@ var ( "kill_timeout", "shutdown_delay", "kill_signal", + "scaling", } normalTaskKeys = append(commonTaskKeys, @@ -110,6 +112,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) { delete(m, "vault") delete(m, "volume_mount") delete(m, "csi_plugin") + delete(m, "scaling") // Build the task var t api.Task @@ -276,6 +279,13 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) { } } + // Parse scaling policies + if o := listVal.Filter("scaling"); len(o.Items) > 0 { + if err := parseTaskScalingPolicies(&t.ScalingPolicies, o); err != nil { + return nil, err + } + } + // If we have a vault block, then parse that if o := listVal.Filter("vault"); len(o.Items) > 0 { v := &api.Vault{ @@ -462,6 +472,56 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error { return nil } +func parseTaskScalingPolicies(result *[]*api.ScalingPolicy, list *ast.ObjectList) error { + if len(list.Items) == 0 { + return nil + } + + errPrefix := "scaling ->" + // Go through each object and turn it into an actual result. + seen := make(map[string]bool) + for _, item := range list.Items { + if l := len(item.Keys); l == 0 { + return multierror.Prefix(fmt.Errorf("task scaling policy missing name"), errPrefix) + } else if l > 1 { + return multierror.Prefix(fmt.Errorf("task scaling policy should only have one name"), errPrefix) + } + n := item.Keys[0].Token.Value().(string) + errPrefix = fmt.Sprintf("scaling[%v] ->", n) + + var policyType string + switch strings.ToLower(n) { + case "cpu": + policyType = "vertical_cpu" + case "mem": + policyType = "vertical_mem" + default: + return multierror.Prefix(fmt.Errorf(`scaling policy name must be "cpu" or "mem"`), errPrefix) + } + + // Make sure we haven't already found this + if seen[n] { + return multierror.Prefix(fmt.Errorf("scaling policy cannot be defined more than once"), errPrefix) + } + seen[n] = true + + p, err := parseScalingPolicy(item) + if err != nil { + return multierror.Prefix(err, errPrefix) + } + + if p.Type == "" { + p.Type = policyType + } else if p.Type != policyType { + return multierror.Prefix(fmt.Errorf("policy had invalid 'type': %q", p.Type), errPrefix) + } + + *result = append(*result, p) + } + + return nil +} + func parseResources(result *api.Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) == 0 { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index bb36383f4..b0f397f1b 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -1153,7 +1153,6 @@ func TestParse(t *testing.T) { }, false, }, - { "tg-service-check.hcl", &api.Job{ @@ -1383,7 +1382,6 @@ func TestParse(t *testing.T) { }, false, }, - { "tg-scaling-policy.hcl", &api.Job{ @@ -1393,8 +1391,9 @@ func TestParse(t *testing.T) { { Name: stringToPtr("group"), Scaling: &api.ScalingPolicy{ - Min: int64ToPtr(5), - Max: int64ToPtr(100), + Type: "horizontal", + Min: int64ToPtr(5), + Max: int64ToPtr(100), Policy: map[string]interface{}{ "foo": "bar", "b": true, @@ -1414,6 +1413,47 @@ func TestParse(t *testing.T) { }, false, }, + { + "task-scaling-policy.hcl", + &api.Job{ + ID: stringToPtr("foo"), + Name: stringToPtr("foo"), + TaskGroups: []*api.TaskGroup{ + { + Name: stringToPtr("bar"), + Tasks: []*api.Task{ + { + Name: "bar", + Driver: "docker", + ScalingPolicies: []*api.ScalingPolicy{ + { + Type: "vertical_cpu", + Target: nil, + Min: int64ToPtr(50), + Max: int64ToPtr(1000), + Policy: map[string]interface{}{ + "test": "cpu", + }, + Enabled: boolToPtr(true), + }, + { + Type: "vertical_mem", + Target: nil, + Min: int64ToPtr(128), + Max: int64ToPtr(1024), + Policy: map[string]interface{}{ + "test": "mem", + }, + Enabled: boolToPtr(false), + }, + }, + }, + }, + }, + }, + }, + false, + }, { "tg-service-connect-gateway-ingress.hcl", &api.Job{ @@ -1480,6 +1520,7 @@ func TestParse(t *testing.T) { { Name: stringToPtr("group"), Scaling: &api.ScalingPolicy{ + Type: "horizontal", Min: nil, Max: int64ToPtr(10), Policy: nil, @@ -1490,19 +1531,51 @@ func TestParse(t *testing.T) { }, false, }, - { "tg-scaling-policy-missing-max.hcl", nil, true, }, - { "tg-scaling-policy-multi-policy.hcl", nil, true, }, - + { + "tg-scaling-policy-with-label.hcl", + nil, + true, + }, + { + "tg-scaling-policy-invalid-type.hcl", + nil, + true, + }, + { + "task-scaling-policy-missing-name.hcl", + nil, + true, + }, + { + "task-scaling-policy-multi-name.hcl", + nil, + true, + }, + { + "task-scaling-policy-multi-cpu.hcl", + nil, + true, + }, + { + "task-scaling-policy-invalid-type.hcl", + nil, + true, + }, + { + "task-scaling-policy-invalid-resource.hcl", + nil, + true, + }, { "multiregion.hcl", &api.Job{ diff --git a/jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl b/jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl new file mode 100644 index 000000000..4f2bd9647 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-invalid-resource.hcl @@ -0,0 +1,17 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "wrong" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl b/jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl new file mode 100644 index 000000000..e8313ab61 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-invalid-type.hcl @@ -0,0 +1,18 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" { + type = "vertical_mem" + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-missing-name.hcl b/jobspec/test-fixtures/task-scaling-policy-missing-name.hcl new file mode 100644 index 000000000..03c31f77d --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-missing-name.hcl @@ -0,0 +1,17 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl b/jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl new file mode 100644 index 000000000..dc8ead75b --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-multi-cpu.hcl @@ -0,0 +1,27 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + scaling "cpu" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy-multi-name.hcl b/jobspec/test-fixtures/task-scaling-policy-multi-name.hcl new file mode 100644 index 000000000..fd3fbc993 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy-multi-name.hcl @@ -0,0 +1,17 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" "mem" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + } +} + diff --git a/jobspec/test-fixtures/task-scaling-policy.hcl b/jobspec/test-fixtures/task-scaling-policy.hcl new file mode 100644 index 000000000..408dd7ca6 --- /dev/null +++ b/jobspec/test-fixtures/task-scaling-policy.hcl @@ -0,0 +1,27 @@ +job "foo" { + task "bar" { + driver = "docker" + + scaling "cpu" { + enabled = true + min = 50 + max = 1000 + + policy { + test = "cpu" + } + } + + scaling "mem" { + enabled = false + min = 128 + max = 1024 + + policy { + test = "mem" + } + } + + } +} + diff --git a/jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl b/jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl new file mode 100644 index 000000000..b5d605b4a --- /dev/null +++ b/jobspec/test-fixtures/tg-scaling-policy-invalid-type.hcl @@ -0,0 +1,17 @@ +job "elastic" { + group "group" { + scaling { + type = "vertical_cpu" + enabled = false + min = 5 + max = 100 + + policy { + foo = "bar" + b = true + val = 5 + f = 0.1 + } + } + } +} diff --git a/jobspec/test-fixtures/tg-scaling-policy-with-label.hcl b/jobspec/test-fixtures/tg-scaling-policy-with-label.hcl new file mode 100644 index 000000000..8f25ef090 --- /dev/null +++ b/jobspec/test-fixtures/tg-scaling-policy-with-label.hcl @@ -0,0 +1,16 @@ +job "elastic" { + group "group" { + scaling "no-label-allowed" { + enabled = false + min = 5 + max = 100 + + policy { + foo = "bar" + b = true + val = 5 + f = 0.1 + } + } + } +} diff --git a/jobspec2/hcl_conversions.go b/jobspec2/hcl_conversions.go index 413a9e148..928981b3d 100644 --- a/jobspec2/hcl_conversions.go +++ b/jobspec2/hcl_conversions.go @@ -3,6 +3,7 @@ package jobspec2 import ( "fmt" "reflect" + "strings" "time" "github.com/hashicorp/hcl/v2" @@ -18,6 +19,7 @@ var hclDecoder *gohcl.Decoder func init() { hclDecoder = newHCLDecoder() hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.TaskGroup{}), decodeTaskGroup) + hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask) } func newHCLDecoder() *gohcl.Decoder { @@ -266,6 +268,7 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D } d := newHCLDecoder() + d.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask) diags = d.DecodeBody(tgBody, ctx, tg) if tgExtra.Vault != nil { @@ -276,6 +279,128 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D } } + if tg.Scaling != nil { + if tg.Scaling.Type == "" { + tg.Scaling.Type = "horizontal" + } + diags = append(diags, validateGroupScalingPolicy(tg.Scaling, tgBody)...) + } return diags } + +func decodeTask(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { + // special case scaling policy + t := val.(*api.Task) + b, remain, diags := body.PartialContent(&hcl.BodySchema{ + Blocks: []hcl.BlockHeaderSchema{ + {Type: "scaling", LabelNames: []string{"name"}}, + }, + }) + + diags = append(diags, decodeTaskScalingPolicies(b.Blocks, ctx, t)...) + + decoder := newHCLDecoder() + diags = append(diags, decoder.DecodeBody(remain, ctx, val)...) + + return diags +} + +func decodeTaskScalingPolicies(blocks hcl.Blocks, ctx *hcl.EvalContext, task *api.Task) hcl.Diagnostics { + if len(blocks) == 0 { + return nil + } + + var diags hcl.Diagnostics + seen := map[string]*hcl.Block{} + for _, b := range blocks { + label := strings.ToLower(b.Labels[0]) + var policyType string + switch label { + case "cpu": + policyType = "vertical_cpu" + case "mem": + policyType = "vertical_mem" + default: + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Invalid scaling policy name", + Detail: `scaling policy name must be "cpu" or "mem"`, + Subject: &b.LabelRanges[0], + }) + continue + } + + if prev, ok := seen[label]; ok { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: fmt.Sprintf("Duplicate scaling %q block", label), + Detail: fmt.Sprintf( + "Only one scaling %s block is allowed. Another was defined at %s.", + label, prev.DefRange.String(), + ), + Subject: &b.DefRange, + }) + continue + } + seen[label] = b + + var p api.ScalingPolicy + diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, &p)...) + + if p.Type == "" { + p.Type = policyType + } else if p.Type != policyType { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Invalid scaling policy type", + Detail: fmt.Sprintf( + "Invalid policy type, expected %q but found %q", + p.Type, policyType), + Subject: &b.DefRange, + }) + continue + } + + task.ScalingPolicies = append(task.ScalingPolicies, &p) + } + + return diags +} + +func validateGroupScalingPolicy(p *api.ScalingPolicy, body hcl.Body) hcl.Diagnostics { + // fast path: do nothing + if p.Max != nil && p.Type == "horizontal" { + return nil + } + + content, _, diags := body.PartialContent(&hcl.BodySchema{ + Blocks: []hcl.BlockHeaderSchema{{Type: "scaling"}}, + }) + + if len(content.Blocks) == 0 { + // unexpected, given that we have a scaling policy + return diags + } + + pc, _, diags := content.Blocks[0].Body.PartialContent(&hcl.BodySchema{ + Attributes: []hcl.AttributeSchema{ + {Name: "max", Required: true}, + {Name: "type", Required: false}, + }, + }) + + if p.Type != "horizontal" { + if attr, ok := pc.Attributes["type"]; ok { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Invalid group scaling type", + Detail: fmt.Sprintf( + "task group scaling policy had invalid type: %q", + p.Type), + Subject: attr.Expr.Range().Ptr(), + }) + } + } + return diags +} diff --git a/jobspec2/parse_test.go b/jobspec2/parse_test.go index 875f3f634..d774aa616 100644 --- a/jobspec2/parse_test.go +++ b/jobspec2/parse_test.go @@ -145,3 +145,163 @@ dynamic "group" { require.Equal(t, "groupB", *out.TaskGroups[1].Name) require.Equal(t, "groupC", *out.TaskGroups[2].Name) } + +func TestParse_InvalidScalingSyntax(t *testing.T) { + cases := []struct { + name string + expectedErr string + hcl string + }{ + { + "valid", + "", + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "group missing max", + `argument "max" is required`, + ` +job "example" { + group "g1" { + scaling { + #max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "group invalid type", + `task group scaling policy had invalid type`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "invalid_type" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "task invalid label", + `scaling policy name must be "cpu" or "mem"`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "not_cpu" { + max = 20 + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + { + "task duplicate blocks", + `Duplicate scaling "cpu" block`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + } + scaling "cpu" { + max = 15 + } + } + } +} +`, + }, + { + "task invalid type", + `Invalid scaling policy type`, + ` +job "example" { + group "g1" { + scaling { + max = 40 + type = "horizontal" + } + + task "t1" { + scaling "cpu" { + max = 20 + type = "invalid" + } + scaling "mem" { + max = 15 + } + } + } +} +`, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, err := ParseWithArgs(c.name+".hcl", strings.NewReader(c.hcl), nil, true) + if c.expectedErr == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr) + } + }) + } +} diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 7d518c568..0d6d7210c 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -37,6 +37,10 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes return structs.ErrPermissionDenied } + allow := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) + } + // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, @@ -48,7 +52,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes prefix := args.QueryOptions.Prefix if args.RequestNamespace() == structs.AllNamespacesSentinel { - allowedNSes, err := allowedNSes(aclObj, state) + allowedNSes, err := allowedNSes(aclObj, state, allow) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index ed16809d1..379c759d4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -451,16 +451,16 @@ func propagateScalingPolicyIDs(old, new *structs.Job) error { oldIDs := make(map[string]string) if old != nil { - // jobs currently only have scaling policies on task groups, so we can - // find correspondences using task group names + // use the job-scoped key (includes type, group, and task) to uniquely + // identify policies in a job for _, p := range old.GetScalingPolicies() { - oldIDs[p.Target[structs.ScalingTargetGroup]] = p.ID + oldIDs[p.JobKey()] = p.ID } } // ignore any existing ID in the policy, they should be empty for _, p := range new.GetScalingPolicies() { - if id, ok := oldIDs[p.Target[structs.ScalingTargetGroup]]; ok { + if id, ok := oldIDs[p.JobKey()]; ok { p.ID = id } else { p.ID = uuid.Generate() @@ -1265,7 +1265,7 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest, // allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to. // Returns `nil` set if the token has access to all namespaces // and ErrPermissionDenied if the token has no capabilities on any namespace. -func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) { +func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) bool) (map[string]bool, error) { if aclObj == nil || aclObj.IsManagement() { return nil, nil } @@ -1279,7 +1279,7 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, err r := make(map[string]bool, len(nses)) for _, ns := range nses { - if aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) { + if allow(ns) { r[ns] = true } } @@ -1367,6 +1367,9 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job return err } prefix := args.QueryOptions.Prefix + allow := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) + } // Setup the blocking query opts := blockingOptions{ @@ -1374,7 +1377,7 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // check if user has permission to all namespaces - allowedNSes, err := allowedNSes(aclObj, state) + allowedNSes, err := allowedNSes(aclObj, state, allow) if err == structs.ErrPermissionDenied { // return empty jobs if token isn't authorized for any // namespace, matching other endpoints diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 6e412b165..e8c555a10 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1380,9 +1380,7 @@ func ScalingPolicy() *structs.ScalingPolicy { Policy: map[string]interface{}{ "a": "b", }, - Enabled: true, - CreateIndex: 10, - ModifyIndex: 20, + Enabled: true, } } diff --git a/nomad/scaling_endpoint.go b/nomad/scaling_endpoint.go index 89e1aa0a6..98da2ac53 100644 --- a/nomad/scaling_endpoint.go +++ b/nomad/scaling_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "strings" "time" metrics "github.com/armon/go-metrics" @@ -8,6 +9,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -19,16 +21,18 @@ type Scaling struct { } // ListPolicies is used to list the policies -func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, - reply *structs.ScalingPolicyListResponse) error { +func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error { - if done, err := a.srv.forward("Scaling.ListPolicies", args, args, reply); done { + if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now()) - // Check for list-job permissions - if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + if args.RequestNamespace() == structs.AllNamespacesSentinel { + return p.listAllNamespaces(args, reply) + } + + if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies) @@ -45,22 +49,24 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // Iterate over all the policies - iter, err := state.ScalingPoliciesByNamespace(ws, args.Namespace) + var err error + var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix) + } else if job := args.Job; job != "" { + iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job) + } else { + iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type) + } + if err != nil { return err } // Convert all the policies to a list stub reply.Policies = nil - for { - raw := iter.Next() - if raw == nil { - break - } + for raw := iter.Next(); raw != nil; raw = iter.Next() { policy := raw.(*structs.ScalingPolicy) - // if _, ok := policies[policy.Target]; ok || mgt { - // reply.Policies = append(reply.Policies, policy.Stub()) - // } reply.Policies = append(reply.Policies, policy.Stub()) } @@ -70,28 +76,27 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, return err } - // Ensure we never set the index to zero, otherwise a blocking query cannot be used. - // We floor the index at one, since realistically the first write must have a higher index. + // Don't return index zero, otherwise a blocking query cannot be used. if index == 0 { index = 1 } reply.Index = index return nil }} - return a.srv.blockingRPC(&opts) + return p.srv.blockingRPC(&opts) } // GetPolicy is used to get a specific policy -func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, +func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, reply *structs.SingleScalingPolicyResponse) error { - if done, err := a.srv.forward("Scaling.GetPolicy", args, args, reply); done { + if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now()) // Check for list-job permissions - if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy) @@ -129,5 +134,71 @@ func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, reply.Index = index return nil }} - return a.srv.blockingRPC(&opts) + return p.srv.blockingRPC(&opts) +} + +func (j *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error { + // Check for list-job permissions + aclObj, err := j.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + prefix := args.QueryOptions.Prefix + allow := func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) || + (aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)) + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + // check if user has permission to all namespaces + allowedNSes, err := allowedNSes(aclObj, state, allow) + if err == structs.ErrPermissionDenied { + // return empty if token isn't authorized for any namespace + reply.Policies = []*structs.ScalingPolicyListStub{} + return nil + } else if err != nil { + return err + } + + // Capture all the policies + var iter memdb.ResultIterator + if args.Type != "" { + iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type) + } else { + iter, err = state.ScalingPolicies(ws) + } + if err != nil { + return err + } + + var policies []*structs.ScalingPolicyListStub + for raw := iter.Next(); raw != nil; raw = iter.Next() { + policy := raw.(*structs.ScalingPolicy) + if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] { + // not permitted to this name namespace + continue + } + if prefix != "" && !strings.HasPrefix(policy.ID, prefix) { + continue + } + policies = append(policies, policy.Stub()) + } + reply.Policies = policies + + // Use the last index that affected the policies table or summary + index, err := state.Index("scaling_policy") + if err != nil { + return err + } + reply.Index = helper.Uint64Max(1, index) + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3fbc59fb1..d5e5f12f9 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sort" + "strings" "time" log "github.com/hashicorp/go-hclog" @@ -1527,6 +1528,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b return fmt.Errorf("unable to update job scaling policies: %v", err) } + if err := s.updateJobRecommendations(index, txn, existingJob, job); err != nil { + return fmt.Errorf("unable to update job recommendations: %v", err) + } + if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { return fmt.Errorf("unable to update job scaling policies: %v", err) } @@ -1644,6 +1649,11 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return fmt.Errorf("deleting job scaling policies failed: %v", err) } + // Delete any job recommendations + if err := s.deleteRecommendationsByJob(index, txn, job); err != nil { + return fmt.Errorf("deleting job recommendatons failed: %v", err) + } + // Delete the scaling events if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil { return fmt.Errorf("deleting job scaling events failed: %v", err) @@ -4567,17 +4577,10 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx ws := memdb.NewWatchSet() - if job.Stop { - if err := s.deleteJobScalingPolicies(index, job, txn); err != nil { - return fmt.Errorf("deleting job scaling policies failed: %v", err) - } - return nil - } - scalingPolicies := job.GetScalingPolicies() - newTargets := map[string]struct{}{} + newTargets := map[string]bool{} for _, p := range scalingPolicies { - newTargets[p.Target[structs.ScalingTargetGroup]] = struct{}{} + newTargets[p.JobKey()] = true } // find existing policies that need to be deleted deletedPolicies := []string{} @@ -4585,13 +4588,9 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx if err != nil { return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err) } - for { - raw := iter.Next() - if raw == nil { - break - } + for raw := iter.Next(); raw != nil; raw = iter.Next() { oldPolicy := raw.(*structs.ScalingPolicy) - if _, ok := newTargets[oldPolicy.Target[structs.ScalingTargetGroup]]; !ok { + if !newTargets[oldPolicy.JobKey()] { deletedPolicies = append(deletedPolicies, oldPolicy.ID) } } @@ -5460,12 +5459,11 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s } policy.ID = existing.ID policy.CreateIndex = existing.CreateIndex - policy.ModifyIndex = index } else { // policy.ID must have been set already in Job.Register before log apply policy.CreateIndex = index - policy.ModifyIndex = index } + policy.ModifyIndex = index // Insert the scaling policy hadUpdates = true @@ -5474,7 +5472,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s } } - // Update the indexes table for scaling policy + // Update the indexes table for scaling policy if we updated any policies if hadUpdates { if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { return fmt.Errorf("index update failed: %v", err) @@ -5740,19 +5738,6 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e return iter, nil } -// ScalingPoliciesByType returns an iterator over scaling policies of a certain type. -func (s *StateStore) ScalingPoliciesByType(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) { - txn := s.db.ReadTxn() - - iter, err := txn.Get("scaling_policy", "type", t) - if err != nil { - return nil, err - } - - ws.Add(iter.WatchCh()) - return iter, nil -} - // ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix. func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() @@ -5766,7 +5751,7 @@ func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (m return iter, nil } -func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, typ string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() iter, err := txn.Get("scaling_policy", "target_prefix", namespace) @@ -5775,18 +5760,22 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } ws.Add(iter.WatchCh()) - filter := func(raw interface{}) bool { - d, ok := raw.(*structs.ScalingPolicy) - if !ok { - return true - } - return d.Target[structs.ScalingTargetNamespace] != namespace + // Wrap the iterator in a filter to exact match the namespace + iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace)) + + // If policy type is specified as well, wrap again + if typ != "" { + iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool { + p, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + return !strings.HasPrefix(p.Type, typ) + }) } - // Wrap the iterator in a filter - wrap := memdb.NewFilterIterator(iter, filter) - return wrap, nil + return iter, nil } func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { @@ -5834,6 +5823,8 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S return nil, nil } +// ScalingPolicyByTargetAndType returns a fully-qualified policy against a target and policy type, +// or nil if it does not exist. This method does not honor the watchset on the policy type, just the target. func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy, error) { txn := s.db.ReadTxn() @@ -5847,6 +5838,7 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ if err != nil { return nil, fmt.Errorf("scaling_policy lookup failed: %v", err) } + ws.Add(it.WatchCh()) // Check for type @@ -5866,6 +5858,34 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ return nil, nil } +func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace string, prefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get("scaling_policy", "id_prefix", prefix) + if err != nil { + return nil, fmt.Errorf("scaling policy lookup failed: %v", err) + } + + ws.Add(iter.WatchCh()) + + iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace)) + + return iter, nil +} + +// scalingPolicyNamespaceFilter returns a filter function that filters all +// scaling policies not targeting the given namespace. +func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool { + return func(raw interface{}) bool { + p, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + + return p.Target[structs.ScalingTargetNamespace] != namespace + } +} + func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() @@ -6216,6 +6236,7 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { return nil } +// ScalingEventsRestore is used to restore scaling events for a job func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { if err := r.txn.Insert("scaling_event", jobEvents); err != nil { return fmt.Errorf("scaling event insert failed: %v", err) diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index c161caf81..e8f91195e 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -20,3 +20,13 @@ func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota s func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } + +// deleteRecommendationsByJob deletes all recommendations for the specified job +func (s *StateStore) deleteRecommendationsByJob(index uint64, txn Txn, job *structs.Job) error { + return nil +} + +// updateJobRecommendations updates/deletes job recommendations as necessary for a job update +func (s *StateStore) updateJobRecommendations(index uint64, txn Txn, prevJob, newJob *structs.Job) error { + return nil +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 3045da7a5..8483f9ace 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8632,12 +8632,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { policy2.Target[structs.ScalingTargetNamespace] = otherNamespace ws1 := memdb.NewWatchSet() - iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace) + iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace, "") require.NoError(err) require.Nil(iter.Next()) ws2 := memdb.NewWatchSet() - iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace) + iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace, "") require.NoError(err) require.Nil(iter.Next()) @@ -8646,7 +8646,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { require.True(watchFired(ws1)) require.True(watchFired(ws2)) - iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace) + iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace, "") require.NoError(err) policiesInDefaultNamespace := []string{} for { @@ -8658,7 +8658,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { } require.ElementsMatch([]string{policy.ID}, policiesInDefaultNamespace) - iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace) + iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace, "") require.NoError(err) policiesInOtherNamespace := []string{} for { @@ -8684,12 +8684,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { policy2.Target[structs.ScalingTargetNamespace] = ns2 ws1 := memdb.NewWatchSet() - iter, err := state.ScalingPoliciesByNamespace(ws1, ns1) + iter, err := state.ScalingPoliciesByNamespace(ws1, ns1, "") require.NoError(err) require.Nil(iter.Next()) ws2 := memdb.NewWatchSet() - iter, err = state.ScalingPoliciesByNamespace(ws2, ns2) + iter, err = state.ScalingPoliciesByNamespace(ws2, ns2, "") require.NoError(err) require.Nil(iter.Next()) @@ -8698,7 +8698,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { require.True(watchFired(ws1)) require.True(watchFired(ws2)) - iter, err = state.ScalingPoliciesByNamespace(nil, ns1) + iter, err = state.ScalingPoliciesByNamespace(nil, ns1, "") require.NoError(err) policiesInNS1 := []string{} for { @@ -8710,7 +8710,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { } require.ElementsMatch([]string{policy1.ID}, policiesInNS1) - iter, err = state.ScalingPoliciesByNamespace(nil, ns2) + iter, err = state.ScalingPoliciesByNamespace(nil, ns2, "") require.NoError(err) policiesInNS2 := []string{} for { @@ -8723,37 +8723,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { require.ElementsMatch([]string{policy2.ID}, policiesInNS2) } -func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { - t.Parallel() - - require := require.New(t) - - state := testStateStore(t) - job, policy := mock.JobWithScalingPolicy() - - // Create a watchset so we can test that upsert fires the watch - ws := memdb.NewWatchSet() - out, err := state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type) - require.NoError(err) - require.Nil(out) - - var newIndex uint64 = 1000 - err = state.UpsertJob(structs.MsgTypeTestSetup, newIndex, job) - require.NoError(err) - require.True(watchFired(ws), "watch did not fire") - - ws = memdb.NewWatchSet() - out, err = state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type) - require.NoError(err) - require.NotNil(out) - require.Equal(newIndex, out.CreateIndex) - require.Equal(newIndex, out.ModifyIndex) - - index, err := state.Index("scaling_policy") - require.NoError(err) - require.Equal(newIndex, index) -} - // Scaling Policy IDs are generated randomly during Job.Register // Subsequent updates of the job should preserve the ID for the scaling policy // associated with a given target. @@ -8883,7 +8852,7 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) { require.Nil(out) // Ensure we see both policies - iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace]) + iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace], "") require.NoError(err) count := 0 for { @@ -8967,13 +8936,12 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) { // upsert a stopped job, verify that we don't fire the watcher or add any scaling policies err = state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) require.NoError(err) - require.False(watchFired(ws)) - // stopped job should have no scaling policies, watcher doesn't fire + require.True(watchFired(ws)) list, err = state.ScalingPolicies(ws) require.NoError(err) - require.Nil(list.Next()) + require.NotNil(list.Next()) - // Establish a new watcher + // Establish a new watchset ws = memdb.NewWatchSet() _, err = state.ScalingPolicies(ws) require.NoError(err) @@ -8982,14 +8950,14 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) { err = state.UpsertJob(structs.MsgTypeTestSetup, 1100, job) require.NoError(err) - // Ensure the scaling policy was added, watch was fired, index was advanced - require.True(watchFired(ws)) + // Ensure the scaling policy still exists, watch was not fired, index was not advanced out, err := state.ScalingPolicyByTargetAndType(nil, policy.Target, policy.Type) require.NoError(err) require.NotNil(out) index, err := state.Index("scaling_policy") require.NoError(err) - require.GreaterOrEqual(index, uint64(1100)) + require.EqualValues(index, 1000) + require.False(watchFired(ws)) } func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) { @@ -9105,15 +9073,12 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) { pOther2.Type = "other-type-2" // Create search routine - search := func(t string) (count int, found []string, err error) { + search := func(t string) (found []string) { found = []string{} - iter, err := state.ScalingPoliciesByType(nil, t) - if err != nil { - return - } + iter, err := state.ScalingPoliciesByTypePrefix(nil, t) + require.NoError(err) for raw := iter.Next(); raw != nil; raw = iter.Next() { - count++ found = append(found, raw.(*structs.ScalingPolicy).Type) } return @@ -9126,47 +9091,23 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) { // Check if we can read horizontal policies expect := []string{pHorzA.Type, pHorzB.Type} - count, found, err := search(structs.ScalingPolicyTypeHorizontal) - - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(2, count) + actual := search(structs.ScalingPolicyTypeHorizontal) + require.ElementsMatch(expect, actual) // Check if we can read policies of other types expect = []string{pOther1.Type} - count, found, err = search("other-type-1") + actual = search("other-type-1") + require.ElementsMatch(expect, actual) - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(1, count) - - // Check if we can't read policies by prefix - expect = []string{} - count, found, err = search("other-type") - - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(0, count) + // Check that we can read policies by prefix + expect = []string{"other-type-1", "other-type-2"} + actual = search("other-type") + require.Equal(expect, actual) // Check for empty result expect = []string{} - count, found, err = search("non-existing") - - sort.Strings(found) - sort.Strings(expect) - - require.NoError(err) - require.Equal(expect, found) - require.Equal(0, count) + actual = search("non-existing") + require.ElementsMatch(expect, actual) } func TestStateStore_ScalingPoliciesByTypePrefix(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 501b4a4bc..481b157ba 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1181,6 +1181,8 @@ type SingleScalingPolicyResponse struct { // ScalingPolicyListRequest is used to parameterize a scaling policy list request type ScalingPolicyListRequest struct { + Job string + Type string QueryOptions } @@ -5273,6 +5275,14 @@ type ScalingPolicy struct { ModifyIndex uint64 } +// JobKey returns a key that is unique to a job-scoped target, useful as a map +// key. This uses the policy type, plus target (group and task). +func (p *ScalingPolicy) JobKey() string { + return p.Type + "\000" + + p.Target[ScalingTargetGroup] + "\000" + + p.Target[ScalingTargetTask] +} + const ( ScalingTargetNamespace = "Namespace" ScalingTargetJob = "Job" @@ -5376,6 +5386,7 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { return !reflect.DeepEqual(*p, copy) } +// TarketTaskGroup updates a ScalingPolicy target to specify a given task group func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { p.Target = map[string]string{ ScalingTargetNamespace: job.Namespace, @@ -5385,6 +5396,13 @@ func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy return p } +// TargetTask updates a ScalingPolicy target to specify a given task +func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy { + p.TargetTaskGroup(job, tg) + p.Target[ScalingTargetTask] = task.Name + return p +} + func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { stub := &ScalingPolicyListStub{ ID: p.ID, @@ -5410,6 +5428,8 @@ func (j *Job) GetScalingPolicies() []*ScalingPolicy { } } + ret = append(ret, j.GetEntScalingPolicies()...) + return ret } @@ -6541,7 +6561,8 @@ type Task struct { // attached to this task. VolumeMounts []*VolumeMount - // The kill signal to use for the task. This is an optional specification, + // ScalingPolicies is a list of scaling policies scoped to this task + ScalingPolicies []*ScalingPolicy // KillSignal is the kill signal to use for the task. This is an optional // specification and defaults to SIGINT diff --git a/nomad/structs/structs_oss.go b/nomad/structs/structs_oss.go index 20f71adec..ec540a82c 100644 --- a/nomad/structs/structs_oss.go +++ b/nomad/structs/structs_oss.go @@ -31,3 +31,7 @@ func (p *ScalingPolicy) validateType() multierror.Error { return mErr } + +func (j *Job) GetEntScalingPolicies() []*ScalingPolicy { + return nil +} diff --git a/tools/go.sum b/tools/go.sum index 8b9149ccf..427f55f98 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -5,8 +5,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8 h1:CkFIJJAKEbZbM2tKmCqt/v9ivgpikjPu5lnDsk8huLE= -github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e h1:8YO27VG7yrHhATTepO2FYLbGUB1wfYbkqoKiVwaAQ+Q= github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8= diff --git a/vendor/github.com/hashicorp/nomad/api/recommendations.go b/vendor/github.com/hashicorp/nomad/api/recommendations.go new file mode 100644 index 000000000..47414f24b --- /dev/null +++ b/vendor/github.com/hashicorp/nomad/api/recommendations.go @@ -0,0 +1,123 @@ +package api + +// Recommendations is used to query the recommendations endpoints. +type Recommendations struct { + client *Client +} + +// Recommendations returns a new handle on the recommendations endpoints. +func (c *Client) Recommendations() *Recommendations { + return &Recommendations{client: c} +} + +// List is used to dump all of the recommendations in the cluster +func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) { + var resp []*Recommendation + qm, err := r.client.query("/v1/recommendations", &resp, q) + if err != nil { + return nil, qm, err + } + return resp, qm, nil +} + +// Info is used to return information on a single recommendation +func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) { + var resp Recommendation + qm, err := r.client.query("/v1/recommendation/"+id, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Upsert is used to create or update a recommendation +func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) { + var resp Recommendation + wm, err := r.client.write("/v1/recommendation", rec, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + +// Delete is used to delete a list of recommendations +func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: []string{}, + Dismiss: ids, + } + wm, err := r.client.write("/v1/recommendations/apply", req, nil, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// Apply is used to apply a set of recommendations +func (r *Recommendations) Apply(ids []string, policyOverride bool) ( + *RecommendationApplyResponse, *WriteMeta, error) { + req := &RecommendationApplyRequest{ + Apply: ids, + PolicyOverride: policyOverride, + } + var resp RecommendationApplyResponse + wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil) + if err != nil { + return nil, nil, err + } + resp.WriteMeta = *wm + return &resp, wm, nil +} + +// Recommendation is used to serialize a recommendation. +type Recommendation struct { + ID string + Region string + Namespace string + JobID string + JobVersion uint64 + Group string + Task string + Resource string + Value int + Current int + Meta map[string]interface{} + Stats map[string]float64 + EnforceVersion bool + + SubmitTime int64 + + CreateIndex uint64 + ModifyIndex uint64 +} + +// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations +type RecommendationApplyRequest struct { + Apply []string + Dismiss []string + PolicyOverride bool +} + +// RecommendationApplyResponse is used to apply a set of recommendations +type RecommendationApplyResponse struct { + UpdatedJobs []*SingleRecommendationApplyResult + Errors []*SingleRecommendationApplyError + WriteMeta +} + +type SingleRecommendationApplyResult struct { + Namespace string + JobID string + JobModifyIndex uint64 + EvalID string + EvalCreateIndex uint64 + Warnings string + Recommendations []string +} + +type SingleRecommendationApplyError struct { + Namespace string + JobID string + Recommendations []string + Error string +} diff --git a/vendor/github.com/hashicorp/nomad/api/scaling.go b/vendor/github.com/hashicorp/nomad/api/scaling.go index 56a0a964a..de889172d 100644 --- a/vendor/github.com/hashicorp/nomad/api/scaling.go +++ b/vendor/github.com/hashicorp/nomad/api/scaling.go @@ -66,12 +66,12 @@ type ScalingPolicy struct { Max *int64 `hcl:"max,optional"` Policy map[string]interface{} `hcl:"policy,block"` Enabled *bool `hcl:"enabled,optional"` + Type string `hcl:"type,optional"` /* fields set by server */ ID string Namespace string - Type string Target map[string]string CreateIndex uint64 ModifyIndex uint64 diff --git a/vendor/github.com/hashicorp/nomad/api/tasks.go b/vendor/github.com/hashicorp/nomad/api/tasks.go index b331ff6b0..2406a9bc3 100644 --- a/vendor/github.com/hashicorp/nomad/api/tasks.go +++ b/vendor/github.com/hashicorp/nomad/api/tasks.go @@ -671,6 +671,7 @@ type Task struct { ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"` KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"` Kind string `hcl:"kind,optional"` + ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"` } func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {