mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Stats Endpoint
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
@@ -157,6 +158,10 @@ type Client struct {
|
||||
// clientACLResolver holds the ACL resolution state
|
||||
clientACLResolver
|
||||
|
||||
// rpcServer is used to serve RPCs by the local agent.
|
||||
rpcServer *rpc.Server
|
||||
endpoints rpcEndpoints
|
||||
|
||||
// baseLabels are used when emitting tagged metrics. All client metrics will
|
||||
// have these tags, and optionally more.
|
||||
baseLabels []metrics.Label
|
||||
@@ -202,6 +207,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
return nil, fmt.Errorf("failed to initialize client: %v", err)
|
||||
}
|
||||
|
||||
// Setup the clients RPC server
|
||||
c.setupClientRpc()
|
||||
|
||||
// Initialize the ACL state
|
||||
if err := c.clientACLResolver.init(); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize ACL state: %v", err)
|
||||
@@ -474,35 +482,6 @@ func (c *Client) Shutdown() error {
|
||||
return c.saveState()
|
||||
}
|
||||
|
||||
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// Invoke the RPCHandler if it exists
|
||||
if c.config.RPCHandler != nil {
|
||||
return c.config.RPCHandler.RPC(method, args, reply)
|
||||
}
|
||||
|
||||
servers := c.servers.all()
|
||||
if len(servers) == 0 {
|
||||
return noServersErr
|
||||
}
|
||||
|
||||
var mErr multierror.Error
|
||||
for _, s := range servers {
|
||||
// Make the RPC request
|
||||
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
|
||||
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
|
||||
mErr.Errors = append(mErr.Errors, errmsg)
|
||||
c.logger.Printf("[DEBUG] client: %v", errmsg)
|
||||
c.servers.failed(s)
|
||||
continue
|
||||
}
|
||||
c.servers.good(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (c *Client) Stats() map[string]map[string]string {
|
||||
@@ -2164,19 +2143,3 @@ func (c *Client) allAllocs() map[string]*structs.Allocation {
|
||||
}
|
||||
return allocs
|
||||
}
|
||||
|
||||
// resolveServer given a sever's address as a string, return it's resolved
|
||||
// net.Addr or an error.
|
||||
func resolveServer(s string) (net.Addr, error) {
|
||||
const defaultClientPort = "4647" // default client RPC port
|
||||
host, port, err := net.SplitHostPort(s)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "missing port") {
|
||||
host = s
|
||||
port = defaultClientPort
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
|
||||
}
|
||||
|
||||
30
client/client_stats_endpoint.go
Normal file
30
client/client_stats_endpoint.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/nomad/client/structs"
|
||||
nstructs "github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// ClientStats endpoint is used for retrieving stats about a client
|
||||
type ClientStats struct {
|
||||
c *Client
|
||||
}
|
||||
|
||||
// Stats is used to retrieve the Clients stats.
|
||||
func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.ClientStatsResponse) error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "client_stats", "stats"}, time.Now())
|
||||
|
||||
// Check node read permissions
|
||||
if aclObj, err := s.c.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
} else if aclObj != nil && !aclObj.AllowNodeRead() {
|
||||
return nstructs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
clientStats := s.c.StatsReporter()
|
||||
reply.HostStats = clientStats.LatestHostStats()
|
||||
return nil
|
||||
}
|
||||
85
client/client_stats_endpoint_test.go
Normal file
85
client/client_stats_endpoint_test.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
nstructs "github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClientStats_Stats(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
client := testClient(t, nil)
|
||||
|
||||
req := &structs.ClientStatsRequest{}
|
||||
var resp structs.ClientStatsResponse
|
||||
require.Nil(client.ClientRPC("ClientStats.Stats", &req, &resp))
|
||||
require.NotNil(resp.HostStats)
|
||||
require.NotNil(resp.HostStats.AllocDirStats)
|
||||
require.NotZero(resp.HostStats.Uptime)
|
||||
}
|
||||
|
||||
func TestClientStats_Stats_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
server, addr, root := testACLServer(t, nil)
|
||||
defer server.Shutdown()
|
||||
|
||||
client := testClient(t, func(c *config.Config) {
|
||||
c.Servers = []string{addr}
|
||||
c.ACLEnabled = true
|
||||
})
|
||||
defer client.Shutdown()
|
||||
|
||||
// Try request without a token and expect failure
|
||||
{
|
||||
req := &structs.ClientStatsRequest{}
|
||||
var resp structs.ClientStatsResponse
|
||||
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
|
||||
require.NotNil(err)
|
||||
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try request with an invalid token and expect failure
|
||||
{
|
||||
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
|
||||
req := &structs.ClientStatsRequest{}
|
||||
req.AuthToken = token.SecretID
|
||||
|
||||
var resp structs.ClientStatsResponse
|
||||
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
|
||||
|
||||
require.NotNil(err)
|
||||
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try request with a valid token
|
||||
{
|
||||
token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyRead))
|
||||
req := &structs.ClientStatsRequest{}
|
||||
req.AuthToken = token.SecretID
|
||||
|
||||
var resp structs.ClientStatsResponse
|
||||
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
|
||||
|
||||
require.Nil(err)
|
||||
require.NotNil(resp.HostStats)
|
||||
}
|
||||
|
||||
// Try request with a management token
|
||||
{
|
||||
req := &structs.ClientStatsRequest{}
|
||||
req.AuthToken = root.SecretID
|
||||
|
||||
var resp structs.ClientStatsResponse
|
||||
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
|
||||
|
||||
require.Nil(err)
|
||||
require.NotNil(resp.HostStats)
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
@@ -122,7 +123,7 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
|
||||
cb(conf)
|
||||
}
|
||||
|
||||
logger := log.New(conf.LogOutput, "", log.LstdFlags)
|
||||
logger := testlog.Logger(t)
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
mockService := newMockConsulServiceClient()
|
||||
mockService.logger = logger
|
||||
|
||||
92
client/rpc.go
Normal file
92
client/rpc.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"strings"
|
||||
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/helper/codec"
|
||||
)
|
||||
|
||||
// rpcEndpoints holds the RPC endpoints
|
||||
type rpcEndpoints struct {
|
||||
ClientStats *ClientStats
|
||||
}
|
||||
|
||||
// ClientRPC is used to make a local, client only RPC call
|
||||
func (c *Client) ClientRPC(method string, args interface{}, reply interface{}) error {
|
||||
codec := &codec.InmemCodec{
|
||||
Method: method,
|
||||
Args: args,
|
||||
Reply: reply,
|
||||
}
|
||||
if err := c.rpcServer.ServeRequest(codec); err != nil {
|
||||
return err
|
||||
}
|
||||
return codec.Err
|
||||
}
|
||||
|
||||
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// Invoke the RPCHandler if it exists
|
||||
if c.config.RPCHandler != nil {
|
||||
return c.config.RPCHandler.RPC(method, args, reply)
|
||||
}
|
||||
|
||||
servers := c.servers.all()
|
||||
if len(servers) == 0 {
|
||||
return noServersErr
|
||||
}
|
||||
|
||||
var mErr multierror.Error
|
||||
for _, s := range servers {
|
||||
// Make the RPC request
|
||||
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
|
||||
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
|
||||
mErr.Errors = append(mErr.Errors, errmsg)
|
||||
c.logger.Printf("[DEBUG] client: %v", errmsg)
|
||||
c.servers.failed(s)
|
||||
continue
|
||||
}
|
||||
c.servers.good(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// setupClientRpc is used to setup the Client's RPC endpoints
|
||||
func (c *Client) setupClientRpc() {
|
||||
// Initialize the RPC handlers
|
||||
c.endpoints.ClientStats = &ClientStats{c}
|
||||
|
||||
// Create the RPC Server
|
||||
c.rpcServer = rpc.NewServer()
|
||||
|
||||
// Register the endpoints with the RPC server
|
||||
c.setupClientRpcServer(c.rpcServer)
|
||||
}
|
||||
|
||||
// setupClientRpcServer is used to populate a client RPC server with endpoints.
|
||||
func (c *Client) setupClientRpcServer(server *rpc.Server) {
|
||||
// Register the endpoints
|
||||
server.Register(c.endpoints.ClientStats)
|
||||
}
|
||||
|
||||
// resolveServer given a sever's address as a string, return it's resolved
|
||||
// net.Addr or an error.
|
||||
func resolveServer(s string) (net.Addr, error) {
|
||||
const defaultClientPort = "4647" // default client RPC port
|
||||
host, port, err := net.SplitHostPort(s)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "missing port") {
|
||||
host = s
|
||||
port = defaultClientPort
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
|
||||
}
|
||||
@@ -93,7 +93,12 @@ func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollec
|
||||
func (h *HostStatsCollector) Collect() error {
|
||||
h.hostStatsLock.Lock()
|
||||
defer h.hostStatsLock.Unlock()
|
||||
return h.collectLocked()
|
||||
}
|
||||
|
||||
// collectLocked collects stats related to resource usage of the host but should
|
||||
// be called with the lock held.
|
||||
func (h *HostStatsCollector) collectLocked() error {
|
||||
hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()}
|
||||
|
||||
// Determine up-time
|
||||
@@ -185,6 +190,13 @@ func (h *HostStatsCollector) collectDiskStats() ([]*DiskStats, error) {
|
||||
func (h *HostStatsCollector) Stats() *HostStats {
|
||||
h.hostStatsLock.RLock()
|
||||
defer h.hostStatsLock.RUnlock()
|
||||
|
||||
if h.hostStats == nil {
|
||||
if err := h.collectLocked(); err != nil {
|
||||
h.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return h.hostStats
|
||||
}
|
||||
|
||||
|
||||
@@ -6,9 +6,22 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// ClientStatsRequest is used to request stats about a Node.
|
||||
type ClientStatsRequest struct {
|
||||
NodeID string
|
||||
structs.QueryOptions
|
||||
}
|
||||
|
||||
// ClientStatsResponse is used to return statistics about a node.
|
||||
type ClientStatsResponse struct {
|
||||
HostStats *stats.HostStats
|
||||
structs.QueryMeta
|
||||
}
|
||||
|
||||
// MemoryStats holds memory usage related stats
|
||||
type MemoryStats struct {
|
||||
RSS uint64
|
||||
|
||||
@@ -3,7 +3,7 @@ package agent
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
)
|
||||
|
||||
func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
@@ -11,16 +11,15 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ
|
||||
return nil, clientNotRunning
|
||||
}
|
||||
|
||||
var secret string
|
||||
s.parseToken(req, &secret)
|
||||
// Parse the ACL token
|
||||
var args cstructs.ClientStatsRequest
|
||||
s.parseToken(req, &args.AuthToken)
|
||||
|
||||
// Check node read permissions
|
||||
if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
|
||||
// Make the RPC
|
||||
var reply cstructs.ClientStatsResponse
|
||||
if err := s.agent.Client().ClientRPC("ClientStats.Stats", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
} else if aclObj != nil && !aclObj.AllowNodeRead() {
|
||||
return nil, structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
clientStats := s.agent.client.StatsReporter()
|
||||
return clientStats.LatestHostStats(), nil
|
||||
return reply.HostStats, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user