From 2dc95485d2dd05fd03900d01dc081be879cd4629 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 18 Mar 2020 15:29:03 -0400 Subject: [PATCH] csi: ACLs for plugin endpoints (#7380) * acl/policy: add PolicyList for global ACLs * acl/acl: plugin policy * acl/acl: maxPrivilege is required to allow "list" * nomad/csi_endpoint: enforce plugin access with PolicyPlugin * nomad/csi_endpoint: check job ACL swapped params * nomad/csi_endpoint_test: test alloc filtering * acl/policy: add namespace csi-register-plugin * nomad/job_endpoint: check csi-register-plugin ACL on registration * nomad/job_endpoint_test: add plugin job cases --- acl/acl.go | 26 +++++++++-- acl/policy.go | 48 +++++++++++--------- acl/policy_test.go | 22 ++++++++++ nomad/csi_endpoint.go | 37 ++++++++++------ nomad/csi_endpoint_test.go | 89 ++++++++++++++++++++++++++++++++++++-- nomad/job_endpoint.go | 6 +++ nomad/job_endpoint_test.go | 25 +++++++++++ nomad/testing.go | 3 +- 8 files changed, 212 insertions(+), 44 deletions(-) diff --git a/acl/acl.go b/acl/acl.go index 569158968..57b64814f 100644 --- a/acl/acl.go +++ b/acl/acl.go @@ -75,6 +75,8 @@ func maxPrivilege(a, b string) string { return PolicyWrite case a == PolicyRead || b == PolicyRead: return PolicyRead + case a == PolicyList || b == PolicyList: + return PolicyList default: return "" } @@ -483,11 +485,10 @@ func (a *ACL) AllowQuotaWrite() bool { // AllowPluginRead checks if read operations are allowed for all plugins func (a *ACL) AllowPluginRead() bool { - // ACL is nil only if ACLs are disabled - if a == nil { - return true - } switch { + // ACL is nil only if ACLs are disabled + case a == nil: + return true case a.management: return true case a.plugin == PolicyRead: @@ -497,6 +498,23 @@ func (a *ACL) AllowPluginRead() bool { } } +// AllowPluginList checks if list operations are allowed for all plugins +func (a *ACL) AllowPluginList() bool { + switch { + // ACL is nil only if ACLs are disabled + case a == nil: + return true + case a.management: + return true + case a.plugin == PolicyList: + return true + case a.plugin == PolicyRead: + return true + default: + return false + } +} + // IsManagement checks if this represents a management token func (a *ACL) IsManagement() bool { return a.management diff --git a/acl/policy.go b/acl/policy.go index 4287fa6d6..b4925577e 100644 --- a/acl/policy.go +++ b/acl/policy.go @@ -13,6 +13,7 @@ const ( // which always takes precedence and supercedes. PolicyDeny = "deny" PolicyRead = "read" + PolicyList = "list" PolicyWrite = "write" ) @@ -22,23 +23,22 @@ const ( // combined we take the union of all capabilities. If the deny capability is present, it // takes precedence and overwrites all other capabilities. - NamespaceCapabilityDeny = "deny" - NamespaceCapabilityListJobs = "list-jobs" - NamespaceCapabilityReadJob = "read-job" - NamespaceCapabilitySubmitJob = "submit-job" - NamespaceCapabilityDispatchJob = "dispatch-job" - NamespaceCapabilityReadLogs = "read-logs" - NamespaceCapabilityReadFS = "read-fs" - NamespaceCapabilityAllocExec = "alloc-exec" - NamespaceCapabilityAllocNodeExec = "alloc-node-exec" - NamespaceCapabilityAllocLifecycle = "alloc-lifecycle" - NamespaceCapabilitySentinelOverride = "sentinel-override" - NamespaceCapabilityPrivilegedTask = "privileged-task" - NamespaceCapabilityCSIAccess = "csi-access" - NamespaceCapabilityCSIWriteVolume = "csi-write-volume" - NamespaceCapabilityCSIReadVolume = "csi-read-volume" - NamespaceCapabilityCSIListVolume = "csi-list-volume" - NamespaceCapabilityCSIMountVolume = "csi-mount-volume" + NamespaceCapabilityDeny = "deny" + NamespaceCapabilityListJobs = "list-jobs" + NamespaceCapabilityReadJob = "read-job" + NamespaceCapabilitySubmitJob = "submit-job" + NamespaceCapabilityDispatchJob = "dispatch-job" + NamespaceCapabilityReadLogs = "read-logs" + NamespaceCapabilityReadFS = "read-fs" + NamespaceCapabilityAllocExec = "alloc-exec" + NamespaceCapabilityAllocNodeExec = "alloc-node-exec" + NamespaceCapabilityAllocLifecycle = "alloc-lifecycle" + NamespaceCapabilitySentinelOverride = "sentinel-override" + NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin" + NamespaceCapabilityCSIWriteVolume = "csi-write-volume" + NamespaceCapabilityCSIReadVolume = "csi-read-volume" + NamespaceCapabilityCSIListVolume = "csi-list-volume" + NamespaceCapabilityCSIMountVolume = "csi-mount-volume" ) var ( @@ -128,6 +128,15 @@ func isPolicyValid(policy string) bool { } } +func (p *PluginPolicy) isValid() bool { + switch p.Policy { + case PolicyDeny, PolicyRead, PolicyList: + return true + default: + return false + } +} + // isNamespaceCapabilityValid ensures the given capability is valid for a namespace policy func isNamespaceCapabilityValid(cap string) bool { switch cap { @@ -135,8 +144,7 @@ func isNamespaceCapabilityValid(cap string) bool { NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs, NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle, NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec, - NamespaceCapabilityCSIAccess, // TODO(langmartin): remove after plugin caps are done - NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume: + NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIRegisterPlugin: return true // Separate the enterprise-only capabilities case NamespaceCapabilitySentinelOverride: @@ -282,7 +290,7 @@ func Parse(rules string) (*Policy, error) { return nil, fmt.Errorf("Invalid quota policy: %#v", p.Quota) } - if p.Plugin != nil && !isPolicyValid(p.Plugin.Policy) { + if p.Plugin != nil && !p.Plugin.isValid() { return nil, fmt.Errorf("Invalid plugin policy: %#v", p.Plugin) } return p, nil diff --git a/acl/policy_test.go b/acl/policy_test.go index 48b5a6a35..d8d21ac81 100644 --- a/acl/policy_test.go +++ b/acl/policy_test.go @@ -260,6 +260,28 @@ func TestParse(t *testing.T) { "Invalid host volume name", nil, }, + { + ` + plugin { + policy = "list" + } + `, + "", + &Policy{ + Plugin: &PluginPolicy{ + Policy: PolicyList, + }, + }, + }, + { + ` + plugin { + policy = "reader" + } + `, + "Invalid plugin policy", + nil, + }, } for idx, tc := range tcases { diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 758570bec..15d106d2c 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -398,13 +398,12 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP return err } - allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess) aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false) if err != nil { return err } - if !allowCSIAccess(aclObj, args.RequestNamespace()) { + if !aclObj.AllowPluginList() { return structs.ErrPermissionDenied } @@ -430,9 +429,6 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP } plug := raw.(*structs.CSIPlugin) - - // FIXME we should filter the ACL access for the plugin's - // namespace, but plugins don't currently have namespaces ps = append(ps, plug.Stub()) } @@ -448,16 +444,18 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu return err } - allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess) aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false) if err != nil { return err } - if !allowCSIAccess(aclObj, args.RequestNamespace()) { + if !aclObj.AllowPluginRead() { return structs.ErrPermissionDenied } + withAllocs := aclObj == nil || + aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) + metricsStart := time.Now() defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, metricsStart) @@ -470,15 +468,26 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu return err } - if plug != nil { - plug, err = state.CSIPluginDenormalize(ws, plug.Copy()) - } - if err != nil { - return err + if plug == nil { + return nil } - // FIXME we should re-check the ACL access for the plugin's - // namespace, but plugins don't currently have namespaces + if withAllocs { + plug, err = state.CSIPluginDenormalize(ws, plug.Copy()) + if err != nil { + return err + } + + // Filter the allocation stubs by our namespace. withAllocs + // means we're allowed + var as []*structs.AllocListStub + for _, a := range plug.Allocations { + if a.Namespace == args.RequestNamespace() { + as = append(as, a) + } + } + plug.Allocations = as + } reply.Plugin = plug return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 8548dc772..eec6ffc8a 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -490,8 +490,6 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { defer shutdown() testutil.WaitForLeader(t, srv.RPC) - ns := structs.DefaultNamespace - deleteNodes := CreateTestCSIPlugin(srv.fsm.State(), "foo") defer deleteNodes() @@ -501,8 +499,9 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { codec := rpcClient(t, srv) // Get the plugin back out - policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess}) - getToken := mock.CreatePolicyAndToken(t, state, 1001, "csi-access", policy) + listJob := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}) + policy := mock.PluginPolicy("read") + listJob + getToken := mock.CreatePolicyAndToken(t, state, 1001, "plugin-read", policy) req2 := &structs.CSIPluginGetRequest{ ID: "foo", @@ -515,6 +514,13 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) require.NoError(t, err) + // Get requires plugin-read, not plugin-list + lPolicy := mock.PluginPolicy("list") + lTok := mock.CreatePolicyAndToken(t, state, 1003, "plugin-list", lPolicy) + req2.AuthToken = lTok.SecretID + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.Error(t, err, "Permission denied") + // List plugins req3 := &structs.CSIPluginListRequest{ QueryOptions: structs.QueryOptions{ @@ -532,15 +538,90 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(resp3.Plugins)) + // List allows plugin-list + req3.AuthToken = lTok.SecretID + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.List", req3, resp3) + require.NoError(t, err) + require.Equal(t, 1, len(resp3.Plugins)) + // Deregistration works deleteNodes() // Plugin is missing + req2.AuthToken = getToken.SecretID err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) require.NoError(t, err) require.Nil(t, resp2.Plugin) } +// TestCSIPluginEndpoint_ACLNamespaceAlloc checks that allocations are filtered by namespace +// when getting plugins, and enforcing that the client has job-read ACL access to the +// namespace of the allocations +func TestCSIPluginEndpoint_ACLNamespaceAlloc(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + state := srv.fsm.State() + + // Setup ACLs + state.BootstrapACLTokens(1, 0, mock.ACLManagementToken()) + srv.config.ACLEnabled = true + codec := rpcClient(t, srv) + listJob := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}) + policy := mock.PluginPolicy("read") + listJob + getToken := mock.CreatePolicyAndToken(t, state, 1001, "plugin-read", policy) + + // Create the plugin and then some allocations to pretend to be the allocs that are + // running the plugin tasks + deleteNodes := CreateTestCSIPlugin(srv.fsm.State(), "foo") + defer deleteNodes() + + plug, _ := state.CSIPluginByID(memdb.NewWatchSet(), "foo") + var allocs []*structs.Allocation + for _, info := range plug.Controllers { + a := mock.Alloc() + a.ID = info.AllocID + allocs = append(allocs, a) + } + for _, info := range plug.Nodes { + a := mock.Alloc() + a.ID = info.AllocID + allocs = append(allocs, a) + } + + require.Equal(t, 3, len(allocs)) + allocs[0].Namespace = "notTheNamespace" + + err := state.UpsertAllocs(1003, allocs) + require.NoError(t, err) + + req := &structs.CSIPluginGetRequest{ + ID: "foo", + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: getToken.SecretID, + }, + } + resp := &structs.CSIPluginGetResponse{} + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req, resp) + require.NoError(t, err) + require.Equal(t, 2, len(resp.Plugin.Allocations)) + + for _, a := range resp.Plugin.Allocations { + require.Equal(t, structs.DefaultNamespace, a.Namespace) + } + + p2 := mock.PluginPolicy("read") + t2 := mock.CreatePolicyAndToken(t, state, 1004, "plugin-read2", p2) + req.AuthToken = t2.SecretID + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req, resp) + require.NoError(t, err) + require.Equal(t, 0, len(resp.Plugin.Allocations)) +} + func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { srv, shutdown := TestServer(t, func(c *Config) {}) defer shutdown() diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 10e0417a9..a0b17323a 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -139,6 +139,12 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return structs.ErrPermissionDenied } } + + if t.CSIPluginConfig != nil { + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityCSIRegisterPlugin) { + return structs.ErrPermissionDenied + } + } } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 4373a61f0..5954a2bc4 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -402,6 +402,16 @@ func TestJobEndpoint_Register_ACL(t *testing.T) { return j } + newCSIPluginJob := func() *structs.Job { + j := mock.Job() + t := j.TaskGroups[0].Tasks[0] + t.CSIPluginConfig = &structs.TaskCSIPluginConfig{ + ID: "foo", + Type: "node", + } + return j + } + submitJobPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob, acl.NamespaceCapabilitySubmitJob}) submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-submit-job", submitJobPolicy) @@ -421,6 +431,9 @@ func TestJobEndpoint_Register_ACL(t *testing.T) { volumesPolicyReadOnly+ volumesPolicyCSIMount) + pluginPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityCSIRegisterPlugin}) + pluginToken := mock.CreatePolicyAndToken(t, s1.State(), 1005, "test-csi-register-plugin", submitJobPolicy+pluginPolicy) + cases := []struct { Name string Job *structs.Job @@ -463,6 +476,18 @@ func TestJobEndpoint_Register_ACL(t *testing.T) { Token: submitJobWithVolumesReadOnlyToken.SecretID, ErrExpected: false, }, + { + Name: "with a token that can submit a job, plugin rejected", + Job: newCSIPluginJob(), + Token: submitJobToken.SecretID, + ErrExpected: true, + }, + { + Name: "with a token that also has csi-register-plugin, accepted", + Job: newCSIPluginJob(), + Token: pluginToken.SecretID, + ErrExpected: false, + }, } for _, tt := range cases { diff --git a/nomad/testing.go b/nomad/testing.go index c5593eeaa..9d63bbf6f 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -187,12 +187,11 @@ func CreateTestCSIPlugin(s *state.StateStore, id string) func() { } // Install healthy plugin fingerprinting results - allocID := uuid.Generate() for _, n := range ns[1:] { n.CSINodePlugins = map[string]*structs.CSIInfo{ id: { PluginID: id, - AllocID: allocID, + AllocID: uuid.Generate(), Healthy: true, HealthDescription: "healthy", RequiresControllerPlugin: true,