diff --git a/client/client.go b/client/client.go index e27d2d62c..1b1332b2c 100644 --- a/client/client.go +++ b/client/client.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "net" + "net/rpc" "os" "path/filepath" "strconv" @@ -157,6 +158,10 @@ type Client struct { // clientACLResolver holds the ACL resolution state clientACLResolver + // rpcServer is used to serve RPCs by the local agent. + rpcServer *rpc.Server + endpoints rpcEndpoints + // baseLabels are used when emitting tagged metrics. All client metrics will // have these tags, and optionally more. baseLabels []metrics.Label @@ -202,6 +207,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to initialize client: %v", err) } + // Setup the clients RPC server + c.setupClientRpc() + // Initialize the ACL state if err := c.clientACLResolver.init(); err != nil { return nil, fmt.Errorf("failed to initialize ACL state: %v", err) @@ -474,35 +482,6 @@ func (c *Client) Shutdown() error { return c.saveState() } -// RPC is used to forward an RPC call to a nomad server, or fail if no servers. -func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - // Invoke the RPCHandler if it exists - if c.config.RPCHandler != nil { - return c.config.RPCHandler.RPC(method, args, reply) - } - - servers := c.servers.all() - if len(servers) == 0 { - return noServersErr - } - - var mErr multierror.Error - for _, s := range servers { - // Make the RPC request - if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil { - errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err) - mErr.Errors = append(mErr.Errors, errmsg) - c.logger.Printf("[DEBUG] client: %v", errmsg) - c.servers.failed(s) - continue - } - c.servers.good(s) - return nil - } - - return mErr.ErrorOrNil() -} - // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { @@ -2164,19 +2143,3 @@ func (c *Client) allAllocs() map[string]*structs.Allocation { } return allocs } - -// resolveServer given a sever's address as a string, return it's resolved -// net.Addr or an error. -func resolveServer(s string) (net.Addr, error) { - const defaultClientPort = "4647" // default client RPC port - host, port, err := net.SplitHostPort(s) - if err != nil { - if strings.Contains(err.Error(), "missing port") { - host = s - port = defaultClientPort - } else { - return nil, err - } - } - return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)) -} diff --git a/client/client_stats_endpoint.go b/client/client_stats_endpoint.go new file mode 100644 index 000000000..3506e3c04 --- /dev/null +++ b/client/client_stats_endpoint.go @@ -0,0 +1,30 @@ +package client + +import ( + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/client/structs" + nstructs "github.com/hashicorp/nomad/nomad/structs" +) + +// ClientStats endpoint is used for retrieving stats about a client +type ClientStats struct { + c *Client +} + +// Stats is used to retrieve the Clients stats. +func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.ClientStatsResponse) error { + defer metrics.MeasureSince([]string{"nomad", "client", "client_stats", "stats"}, time.Now()) + + // Check node read permissions + if aclObj, err := s.c.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNodeRead() { + return nstructs.ErrPermissionDenied + } + + clientStats := s.c.StatsReporter() + reply.HostStats = clientStats.LatestHostStats() + return nil +} diff --git a/client/client_stats_endpoint_test.go b/client/client_stats_endpoint_test.go new file mode 100644 index 000000000..8662a8c4a --- /dev/null +++ b/client/client_stats_endpoint_test.go @@ -0,0 +1,85 @@ +package client + +import ( + "testing" + + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/mock" + nstructs "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestClientStats_Stats(t *testing.T) { + t.Parallel() + require := require.New(t) + client := testClient(t, nil) + + req := &structs.ClientStatsRequest{} + var resp structs.ClientStatsResponse + require.Nil(client.ClientRPC("ClientStats.Stats", &req, &resp)) + require.NotNil(resp.HostStats) + require.NotNil(resp.HostStats.AllocDirStats) + require.NotZero(resp.HostStats.Uptime) +} + +func TestClientStats_Stats_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + server, addr, root := testACLServer(t, nil) + defer server.Shutdown() + + client := testClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.ACLEnabled = true + }) + defer client.Shutdown() + + // Try request without a token and expect failure + { + req := &structs.ClientStatsRequest{} + var resp structs.ClientStatsResponse + err := client.ClientRPC("ClientStats.Stats", &req, &resp) + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect failure + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) + req := &structs.ClientStatsRequest{} + req.AuthToken = token.SecretID + + var resp structs.ClientStatsResponse + err := client.ClientRPC("ClientStats.Stats", &req, &resp) + + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + { + token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyRead)) + req := &structs.ClientStatsRequest{} + req.AuthToken = token.SecretID + + var resp structs.ClientStatsResponse + err := client.ClientRPC("ClientStats.Stats", &req, &resp) + + require.Nil(err) + require.NotNil(resp.HostStats) + } + + // Try request with a management token + { + req := &structs.ClientStatsRequest{} + req.AuthToken = root.SecretID + + var resp structs.ClientStatsResponse + err := client.ClientRPC("ClientStats.Stats", &req, &resp) + + require.Nil(err) + require.NotNil(resp.HostStats) + } +} diff --git a/client/client_test.go b/client/client_test.go index da6a14fb7..f6c647b3a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" @@ -122,7 +123,7 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client { cb(conf) } - logger := log.New(conf.LogOutput, "", log.LstdFlags) + logger := testlog.Logger(t) catalog := consul.NewMockCatalog(logger) mockService := newMockConsulServiceClient() mockService.logger = logger diff --git a/client/rpc.go b/client/rpc.go new file mode 100644 index 000000000..1a658dde5 --- /dev/null +++ b/client/rpc.go @@ -0,0 +1,92 @@ +package client + +import ( + "fmt" + "net" + "net/rpc" + "strings" + + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper/codec" +) + +// rpcEndpoints holds the RPC endpoints +type rpcEndpoints struct { + ClientStats *ClientStats +} + +// ClientRPC is used to make a local, client only RPC call +func (c *Client) ClientRPC(method string, args interface{}, reply interface{}) error { + codec := &codec.InmemCodec{ + Method: method, + Args: args, + Reply: reply, + } + if err := c.rpcServer.ServeRequest(codec); err != nil { + return err + } + return codec.Err +} + +// RPC is used to forward an RPC call to a nomad server, or fail if no servers. +func (c *Client) RPC(method string, args interface{}, reply interface{}) error { + // Invoke the RPCHandler if it exists + if c.config.RPCHandler != nil { + return c.config.RPCHandler.RPC(method, args, reply) + } + + servers := c.servers.all() + if len(servers) == 0 { + return noServersErr + } + + var mErr multierror.Error + for _, s := range servers { + // Make the RPC request + if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil { + errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err) + mErr.Errors = append(mErr.Errors, errmsg) + c.logger.Printf("[DEBUG] client: %v", errmsg) + c.servers.failed(s) + continue + } + c.servers.good(s) + return nil + } + + return mErr.ErrorOrNil() +} + +// setupClientRpc is used to setup the Client's RPC endpoints +func (c *Client) setupClientRpc() { + // Initialize the RPC handlers + c.endpoints.ClientStats = &ClientStats{c} + + // Create the RPC Server + c.rpcServer = rpc.NewServer() + + // Register the endpoints with the RPC server + c.setupClientRpcServer(c.rpcServer) +} + +// setupClientRpcServer is used to populate a client RPC server with endpoints. +func (c *Client) setupClientRpcServer(server *rpc.Server) { + // Register the endpoints + server.Register(c.endpoints.ClientStats) +} + +// resolveServer given a sever's address as a string, return it's resolved +// net.Addr or an error. +func resolveServer(s string) (net.Addr, error) { + const defaultClientPort = "4647" // default client RPC port + host, port, err := net.SplitHostPort(s) + if err != nil { + if strings.Contains(err.Error(), "missing port") { + host = s + port = defaultClientPort + } else { + return nil, err + } + } + return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)) +} diff --git a/client/stats/host.go b/client/stats/host.go index 8f0f92377..23826b10a 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -93,7 +93,12 @@ func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollec func (h *HostStatsCollector) Collect() error { h.hostStatsLock.Lock() defer h.hostStatsLock.Unlock() + return h.collectLocked() +} +// collectLocked collects stats related to resource usage of the host but should +// be called with the lock held. +func (h *HostStatsCollector) collectLocked() error { hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()} // Determine up-time @@ -185,6 +190,13 @@ func (h *HostStatsCollector) collectDiskStats() ([]*DiskStats, error) { func (h *HostStatsCollector) Stats() *HostStats { h.hostStatsLock.RLock() defer h.hostStatsLock.RUnlock() + + if h.hostStats == nil { + if err := h.collectLocked(); err != nil { + h.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err) + } + } + return h.hostStats } diff --git a/client/structs/structs.go b/client/structs/structs.go index 97887232d..0fdb32410 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -6,9 +6,22 @@ import ( "strconv" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" ) +// ClientStatsRequest is used to request stats about a Node. +type ClientStatsRequest struct { + NodeID string + structs.QueryOptions +} + +// ClientStatsResponse is used to return statistics about a node. +type ClientStatsResponse struct { + HostStats *stats.HostStats + structs.QueryMeta +} + // MemoryStats holds memory usage related stats type MemoryStats struct { RSS uint64 diff --git a/command/agent/stats_endpoint.go b/command/agent/stats_endpoint.go index ba18d9c28..559dfb4f9 100644 --- a/command/agent/stats_endpoint.go +++ b/command/agent/stats_endpoint.go @@ -3,7 +3,7 @@ package agent import ( "net/http" - "github.com/hashicorp/nomad/nomad/structs" + cstructs "github.com/hashicorp/nomad/client/structs" ) func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -11,16 +11,15 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ return nil, clientNotRunning } - var secret string - s.parseToken(req, &secret) + // Parse the ACL token + var args cstructs.ClientStatsRequest + s.parseToken(req, &args.AuthToken) - // Check node read permissions - if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil { + // Make the RPC + var reply cstructs.ClientStatsResponse + if err := s.agent.Client().ClientRPC("ClientStats.Stats", &args, &reply); err != nil { return nil, err - } else if aclObj != nil && !aclObj.AllowNodeRead() { - return nil, structs.ErrPermissionDenied } - clientStats := s.agent.client.StatsReporter() - return clientStats.LatestHostStats(), nil + return reply.HostStats, nil }