diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go new file mode 100644 index 000000000..11d75e45c --- /dev/null +++ b/client/alloc_endpoint.go @@ -0,0 +1,76 @@ +package client + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" + nstructs "github.com/hashicorp/nomad/nomad/structs" +) + +// Allocations endpoint is used for interacting with client allocations +type Allocations struct { + c *Client +} + +// GarbageCollectAll is used to garbage collect all allocations on a client. +func (a *Allocations) GarbageCollectAll(args *nstructs.GenericRequest, reply *nstructs.GenericResponse) error { + defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect_all"}, time.Now()) + + // Check node read permissions + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNodeWrite() { + return nstructs.ErrPermissionDenied + } + + a.c.CollectAllAllocs() + return nil +} + +// GarbageCollect is used to garbage collect an allocation on a client. +func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply *nstructs.GenericResponse) error { + defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect"}, time.Now()) + + // Check node read permissions + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilitySubmitJob) { + return nstructs.ErrPermissionDenied + } + + if !a.c.CollectAllocation(args.AllocID) { + // Could not find alloc + return fmt.Errorf("unknown allocation %q", args.AllocID) + } + + return nil +} + +// Stats is used to collect allocation statistics +func (a *Allocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error { + defer metrics.MeasureSince([]string{"client", "allocations", "stats"}, time.Now()) + + // Check node read permissions + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadJob) { + return nstructs.ErrPermissionDenied + } + + clientStats := a.c.StatsReporter() + aStats, err := clientStats.GetAllocStats(args.AllocID) + if err != nil { + return err + } + + stats, err := aStats.LatestAllocStats(args.Task) + if err != nil { + return err + } + + reply.Stats = stats + return nil +} diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go new file mode 100644 index 000000000..9afc72deb --- /dev/null +++ b/client/alloc_endpoint_test.go @@ -0,0 +1,249 @@ +package client + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/mock" + nstructs "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestAllocations_GarbageCollectAll(t *testing.T) { + t.Parallel() + require := require.New(t) + client := TestClient(t, nil) + + req := &nstructs.GenericRequest{} + var resp nstructs.GenericResponse + require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)) +} + +func TestAllocations_GarbageCollectAll_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + server, addr, root := testACLServer(t, nil) + defer server.Shutdown() + + client := TestClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.ACLEnabled = true + }) + defer client.Shutdown() + + // Try request without a token and expect failure + { + req := &nstructs.GenericRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp) + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect failure + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) + req := &nstructs.GenericRequest{} + req.AuthToken = token.SecretID + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp) + + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + { + token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyWrite)) + req := &nstructs.GenericRequest{} + req.AuthToken = token.SecretID + var resp nstructs.GenericResponse + require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)) + } + + // Try request with a management token + { + req := &nstructs.GenericRequest{} + req.AuthToken = root.SecretID + var resp nstructs.GenericResponse + require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)) + } +} + +func TestAllocations_GarbageCollect(t *testing.T) { + t.Parallel() + require := require.New(t) + client := TestClient(t, nil) + + a := mock.Alloc() + require.Nil(client.addAlloc(a, "")) + + // Try with bad alloc + req := &nstructs.AllocSpecificRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) + require.NotNil(err) + + // Try with good alloc + req.AllocID = a.ID + testutil.WaitForResult(func() (bool, error) { + var resp2 nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp2) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +func TestAllocations_GarbageCollect_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + server, addr, root := testACLServer(t, nil) + defer server.Shutdown() + + client := TestClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.ACLEnabled = true + }) + defer client.Shutdown() + + // Try request without a token and expect failure + { + req := &nstructs.AllocSpecificRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect failure + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) + req := &nstructs.AllocSpecificRequest{} + req.AuthToken = token.SecretID + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) + + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid", + mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})) + req := &nstructs.AllocSpecificRequest{} + req.AuthToken = token.SecretID + req.Namespace = nstructs.DefaultNamespace + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) + require.Contains(err.Error(), "unknown allocation") + } + + // Try request with a management token + { + req := &nstructs.AllocSpecificRequest{} + req.AuthToken = root.SecretID + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) + require.Contains(err.Error(), "unknown allocation") + } +} + +func TestAllocations_Stats(t *testing.T) { + t.Parallel() + require := require.New(t) + client := TestClient(t, nil) + + a := mock.Alloc() + require.Nil(client.addAlloc(a, "")) + + // Try with bad alloc + req := &cstructs.AllocStatsRequest{} + var resp cstructs.AllocStatsResponse + err := client.ClientRPC("Allocations.Stats", &req, &resp) + require.NotNil(err) + + // Try with good alloc + req.AllocID = a.ID + testutil.WaitForResult(func() (bool, error) { + var resp2 cstructs.AllocStatsResponse + err := client.ClientRPC("Allocations.Stats", &req, &resp2) + if err != nil { + return false, err + } + if resp2.Stats == nil { + return false, fmt.Errorf("invalid stats object") + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +func TestAllocations_Stats_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + server, addr, root := testACLServer(t, nil) + defer server.Shutdown() + + client := TestClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.ACLEnabled = true + }) + defer client.Shutdown() + + // Try request without a token and expect failure + { + req := &cstructs.AllocStatsRequest{} + var resp cstructs.AllocStatsResponse + err := client.ClientRPC("Allocations.Stats", &req, &resp) + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect failure + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) + req := &cstructs.AllocStatsRequest{} + req.AuthToken = token.SecretID + + var resp cstructs.AllocStatsResponse + err := client.ClientRPC("Allocations.Stats", &req, &resp) + + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid", + mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + req := &cstructs.AllocStatsRequest{} + req.AuthToken = token.SecretID + req.Namespace = nstructs.DefaultNamespace + + var resp cstructs.AllocStatsResponse + err := client.ClientRPC("Allocations.Stats", &req, &resp) + require.Contains(err.Error(), "unknown allocation") + } + + // Try request with a management token + { + req := &cstructs.AllocStatsRequest{} + req.AuthToken = root.SecretID + + var resp cstructs.AllocStatsResponse + err := client.ClientRPC("Allocations.Stats", &req, &resp) + require.Contains(err.Error(), "unknown allocation") + } +} diff --git a/client/rpc.go b/client/rpc.go index 0144c2bd2..1fe52288b 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -22,6 +22,7 @@ import ( type rpcEndpoints struct { ClientStats *ClientStats FileSystem *FileSystem + Allocations *Allocations } // ClientRPC is used to make a local, client only RPC call @@ -205,6 +206,7 @@ func (c *Client) setupClientRpc() { // Initialize the RPC handlers c.endpoints.ClientStats = &ClientStats{c} c.endpoints.FileSystem = NewFileSystemEndpoint(c) + c.endpoints.Allocations = &Allocations{c} // Create the RPC Server c.rpcServer = rpc.NewServer() @@ -215,6 +217,14 @@ func (c *Client) setupClientRpc() { go c.rpcConnListener() } +// setupClientRpcServer is used to populate a client RPC server with endpoints. +func (c *Client) setupClientRpcServer(server *rpc.Server) { + // Register the endpoints + server.Register(c.endpoints.ClientStats) + server.Register(c.endpoints.FileSystem) + server.Register(c.endpoints.Allocations) +} + // rpcConnListener is a long lived function that listens for new connections // being made on the connection pool and starts an RPC listener for each // connection. @@ -346,13 +356,6 @@ func (c *Client) handleStreamingConn(conn net.Conn) { handler(conn) } -// setupClientRpcServer is used to populate a client RPC server with endpoints. -func (c *Client) setupClientRpcServer(server *rpc.Server) { - // Register the endpoints - server.Register(c.endpoints.ClientStats) - server.Register(c.endpoints.FileSystem) -} - // resolveServer given a sever's address as a string, return it's resolved // net.Addr or an error. func resolveServer(s string) (net.Addr, error) { diff --git a/client/structs/structs.go b/client/structs/structs.go index ca496378b..dd0ec948d 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -152,6 +152,25 @@ type StreamErrWrapper struct { Payload []byte } +// AllocStatsRequest is used to request the resource usage of a given +// allocation, potentially filtering by task +type AllocStatsRequest struct { + // AllocID is the allocation to retrieves stats for + AllocID string + + // Task is an optional filter to only request stats for the task. + Task string + + structs.QueryOptions +} + +// AllocStatsResponse is used to return the resource usage of a given +// allocation. +type AllocStatsResponse struct { + Stats *AllocResourceUsage + structs.QueryMeta +} + // MemoryStats holds memory usage related stats type MemoryStats struct { RSS uint64