client implementation of alloc gc and stats

This commit is contained in:
Alex Dadgar
2018-02-05 16:16:20 -08:00
parent 4ac1e25478
commit ce37deebf4
4 changed files with 354 additions and 7 deletions

76
client/alloc_endpoint.go Normal file
View File

@@ -0,0 +1,76 @@
package client
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
)
// Allocations endpoint is used for interacting with client allocations
type Allocations struct {
c *Client
}
// GarbageCollectAll is used to garbage collect all allocations on a client.
func (a *Allocations) GarbageCollectAll(args *nstructs.GenericRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect_all"}, time.Now())
// Check node read permissions
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return nstructs.ErrPermissionDenied
}
a.c.CollectAllAllocs()
return nil
}
// GarbageCollect is used to garbage collect an allocation on a client.
func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect"}, time.Now())
// Check node read permissions
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilitySubmitJob) {
return nstructs.ErrPermissionDenied
}
if !a.c.CollectAllocation(args.AllocID) {
// Could not find alloc
return fmt.Errorf("unknown allocation %q", args.AllocID)
}
return nil
}
// Stats is used to collect allocation statistics
func (a *Allocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "stats"}, time.Now())
// Check node read permissions
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadJob) {
return nstructs.ErrPermissionDenied
}
clientStats := a.c.StatsReporter()
aStats, err := clientStats.GetAllocStats(args.AllocID)
if err != nil {
return err
}
stats, err := aStats.LatestAllocStats(args.Task)
if err != nil {
return err
}
reply.Stats = stats
return nil
}

View File

@@ -0,0 +1,249 @@
package client
import (
"fmt"
"testing"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestAllocations_GarbageCollectAll(t *testing.T) {
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
req := &nstructs.GenericRequest{}
var resp nstructs.GenericResponse
require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp))
}
func TestAllocations_GarbageCollectAll_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
// Try request without a token and expect failure
{
req := &nstructs.GenericRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
req := &nstructs.GenericRequest{}
req.AuthToken = token.SecretID
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with a valid token
{
token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyWrite))
req := &nstructs.GenericRequest{}
req.AuthToken = token.SecretID
var resp nstructs.GenericResponse
require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp))
}
// Try request with a management token
{
req := &nstructs.GenericRequest{}
req.AuthToken = root.SecretID
var resp nstructs.GenericResponse
require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp))
}
}
func TestAllocations_GarbageCollect(t *testing.T) {
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
a := mock.Alloc()
require.Nil(client.addAlloc(a, ""))
// Try with bad alloc
req := &nstructs.AllocSpecificRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
require.NotNil(err)
// Try with good alloc
req.AllocID = a.ID
testutil.WaitForResult(func() (bool, error) {
var resp2 nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp2)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocations_GarbageCollect_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
// Try request without a token and expect failure
{
req := &nstructs.AllocSpecificRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
req := &nstructs.AllocSpecificRequest{}
req.AuthToken = token.SecretID
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with a valid token
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob}))
req := &nstructs.AllocSpecificRequest{}
req.AuthToken = token.SecretID
req.Namespace = nstructs.DefaultNamespace
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
require.Contains(err.Error(), "unknown allocation")
}
// Try request with a management token
{
req := &nstructs.AllocSpecificRequest{}
req.AuthToken = root.SecretID
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
require.Contains(err.Error(), "unknown allocation")
}
}
func TestAllocations_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
a := mock.Alloc()
require.Nil(client.addAlloc(a, ""))
// Try with bad alloc
req := &cstructs.AllocStatsRequest{}
var resp cstructs.AllocStatsResponse
err := client.ClientRPC("Allocations.Stats", &req, &resp)
require.NotNil(err)
// Try with good alloc
req.AllocID = a.ID
testutil.WaitForResult(func() (bool, error) {
var resp2 cstructs.AllocStatsResponse
err := client.ClientRPC("Allocations.Stats", &req, &resp2)
if err != nil {
return false, err
}
if resp2.Stats == nil {
return false, fmt.Errorf("invalid stats object")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocations_Stats_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
// Try request without a token and expect failure
{
req := &cstructs.AllocStatsRequest{}
var resp cstructs.AllocStatsResponse
err := client.ClientRPC("Allocations.Stats", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
req := &cstructs.AllocStatsRequest{}
req.AuthToken = token.SecretID
var resp cstructs.AllocStatsResponse
err := client.ClientRPC("Allocations.Stats", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with a valid token
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
req := &cstructs.AllocStatsRequest{}
req.AuthToken = token.SecretID
req.Namespace = nstructs.DefaultNamespace
var resp cstructs.AllocStatsResponse
err := client.ClientRPC("Allocations.Stats", &req, &resp)
require.Contains(err.Error(), "unknown allocation")
}
// Try request with a management token
{
req := &cstructs.AllocStatsRequest{}
req.AuthToken = root.SecretID
var resp cstructs.AllocStatsResponse
err := client.ClientRPC("Allocations.Stats", &req, &resp)
require.Contains(err.Error(), "unknown allocation")
}
}

View File

@@ -22,6 +22,7 @@ import (
type rpcEndpoints struct {
ClientStats *ClientStats
FileSystem *FileSystem
Allocations *Allocations
}
// ClientRPC is used to make a local, client only RPC call
@@ -205,6 +206,7 @@ func (c *Client) setupClientRpc() {
// Initialize the RPC handlers
c.endpoints.ClientStats = &ClientStats{c}
c.endpoints.FileSystem = NewFileSystemEndpoint(c)
c.endpoints.Allocations = &Allocations{c}
// Create the RPC Server
c.rpcServer = rpc.NewServer()
@@ -215,6 +217,14 @@ func (c *Client) setupClientRpc() {
go c.rpcConnListener()
}
// setupClientRpcServer is used to populate a client RPC server with endpoints.
func (c *Client) setupClientRpcServer(server *rpc.Server) {
// Register the endpoints
server.Register(c.endpoints.ClientStats)
server.Register(c.endpoints.FileSystem)
server.Register(c.endpoints.Allocations)
}
// rpcConnListener is a long lived function that listens for new connections
// being made on the connection pool and starts an RPC listener for each
// connection.
@@ -346,13 +356,6 @@ func (c *Client) handleStreamingConn(conn net.Conn) {
handler(conn)
}
// setupClientRpcServer is used to populate a client RPC server with endpoints.
func (c *Client) setupClientRpcServer(server *rpc.Server) {
// Register the endpoints
server.Register(c.endpoints.ClientStats)
server.Register(c.endpoints.FileSystem)
}
// resolveServer given a sever's address as a string, return it's resolved
// net.Addr or an error.
func resolveServer(s string) (net.Addr, error) {

View File

@@ -152,6 +152,25 @@ type StreamErrWrapper struct {
Payload []byte
}
// AllocStatsRequest is used to request the resource usage of a given
// allocation, potentially filtering by task
type AllocStatsRequest struct {
// AllocID is the allocation to retrieves stats for
AllocID string
// Task is an optional filter to only request stats for the task.
Task string
structs.QueryOptions
}
// AllocStatsResponse is used to return the resource usage of a given
// allocation.
type AllocStatsResponse struct {
Stats *AllocResourceUsage
structs.QueryMeta
}
// MemoryStats holds memory usage related stats
type MemoryStats struct {
RSS uint64