Task API via Unix Domain Socket (#15864)

This change introduces the Task API: a portable way for tasks to access Nomad's HTTP API. This particular implementation uses a Unix Domain Socket and, unlike the agent's HTTP API, always requires authentication even if ACLs are disabled.

This PR contains the core feature and tests but followup work is required for the following TODO items:

- Docs - might do in a followup since dynamic node metadata / task api / workload id all need to interlink
- Unit tests for auth middleware
- Caching for auth middleware
- Rate limiting on negative lookups for auth middleware

---------

Co-authored-by: Seth Hoenig <shoenig@duck.com>
This commit is contained in:
Michael Schurter
2023-02-06 11:31:22 -08:00
committed by GitHub
parent a4254f9118
commit 9bab96ebd3
23 changed files with 876 additions and 38 deletions

View File

@@ -115,7 +115,11 @@ type Agent struct {
builtinListener net.Listener
builtinDialer *bufconndialer.BufConnWrapper
InmemSink *metrics.InmemSink
// builtinServer is an HTTP server for attaching per-task listeners. Always
// requires auth.
builtinServer *builtinAPI
inmemSink *metrics.InmemSink
}
// NewAgent is used to create a new agent with the given configuration
@@ -124,7 +128,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i
config: config,
logOutput: logOutput,
shutdownCh: make(chan struct{}),
InmemSink: inmem,
inmemSink: inmem,
}
// Create the loggers
@@ -1020,6 +1024,11 @@ func (a *Agent) setupClient() error {
a.builtinListener, a.builtinDialer = bufconndialer.New()
conf.TemplateDialer = a.builtinDialer
// Initialize builtin API server here for use in the client, but it won't
// accept connections until the HTTP servers are created.
a.builtinServer = newBuiltinAPI()
conf.APIListenerRegistrar = a.builtinServer
nomadClient, err := client.NewClient(
conf, a.consulCatalog, a.consulProxies, a.consulService, nil)
if err != nil {
@@ -1300,6 +1309,11 @@ func (a *Agent) GetConfig() *Config {
return a.config
}
// GetMetricsSink returns the metrics sink.
func (a *Agent) GetMetricsSink() *metrics.InmemSink {
return a.inmemSink
}
// setupConsul creates the Consul client and starts its main Run loop.
func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
apiConf, err := consulConfig.ApiConfig()

View File

@@ -440,7 +440,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i
return nil, structs.ErrPermissionDenied
}
peers := s.agent.client.GetServers()
peers := client.GetServers()
sort.Strings(peers)
return peers, nil
}
@@ -468,9 +468,9 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
}
// Set the servers list into the client
s.agent.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT")
s.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT")
if _, err := client.SetServers(servers); err != nil {
s.agent.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT")
s.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT")
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())
}
@@ -708,7 +708,7 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques
// The RPC endpoint actually forwards the request to the correct
// agent, but we need to use the correct RPC interface.
localClient, remoteClient, localServer := s.rpcHandlerForNode(lookupNodeID)
s.agent.logger.Debug("s.rpcHandlerForNode()", "lookupNodeID", lookupNodeID, "serverID", serverID, "nodeID", nodeID, "localClient", localClient, "remoteClient", remoteClient, "localServer", localServer)
s.logger.Debug("s.rpcHandlerForNode()", "lookupNodeID", lookupNodeID, "serverID", serverID, "nodeID", nodeID, "localClient", localClient, "remoteClient", remoteClient, "localServer", localServer)
// Make the RPC call
if localClient {

View File

@@ -222,7 +222,7 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
case "exec":
return s.allocExec(allocID, resp, req)
case "snapshot":
if s.agent.client == nil {
if s.agent.Client() == nil {
return nil, clientNotRunning
}
return s.allocSnapshot(allocID, resp, req)

View File

@@ -46,7 +46,7 @@ func TestConsul_Integration(t *testing.T) {
// Create an embedded Consul server
testconsul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.Peering = nil // fix for older versions of Consul (<1.13.0) that don't support peering
c.Peering = nil // fix for older versions of Consul (<1.13.0) that don't support peering
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
@@ -61,6 +61,7 @@ func TestConsul_Integration(t *testing.T) {
conf := config.DefaultConfig()
conf.Node = mock.Node()
conf.ConsulConfig.Addr = testconsul.HTTPAddr
conf.APIListenerRegistrar = config.NoopAPIListenerRegistrar{}
consulConfig, err := conf.ConsulConfig.ApiConfig()
if err != nil {
t.Fatalf("error generating consul config: %v", err)

View File

@@ -2,6 +2,7 @@ package agent
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
@@ -26,8 +27,10 @@ import (
"golang.org/x/time/rate"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/helper/noxssrw"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -74,9 +77,18 @@ var (
type handlerFn func(resp http.ResponseWriter, req *http.Request) (interface{}, error)
type handlerByteFn func(resp http.ResponseWriter, req *http.Request) ([]byte, error)
type RPCer interface {
RPC(string, any, any) error
Server() *nomad.Server
Client() *client.Client
Stats() map[string]map[string]string
GetConfig() *Config
GetMetricsSink() *metrics.InmemSink
}
// HTTPServer is used to wrap an Agent and expose it over an HTTP interface
type HTTPServer struct {
agent *Agent
agent RPCer
mux *http.ServeMux
listener net.Listener
listenerCh chan struct{}
@@ -170,7 +182,7 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
srvs = append(srvs, srv)
}
// This HTTP server is only create when running in client mode, otherwise
// This HTTP server is only created when running in client mode, otherwise
// the builtinDialer and builtinListener will be nil.
if agent.builtinDialer != nil && agent.builtinListener != nil {
srv := &HTTPServer{
@@ -185,12 +197,15 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
srv.registerHandlers(config.EnableDebug)
// builtinServer adds a wrapper to always authenticate requests
httpServer := http.Server{
Addr: srv.Addr,
Handler: srv.mux,
Handler: newAuthMiddleware(srv, srv.mux),
ErrorLog: newHTTPServerLogger(srv.logger),
}
agent.builtinServer.SetServer(&httpServer)
go func() {
defer close(srv.listenerCh)
httpServer.Serve(agent.builtinListener)
@@ -465,7 +480,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.Handle("/v1/vars", wrapCORS(s.wrap(s.VariablesListRequest)))
s.mux.Handle("/v1/var/", wrapCORSWithAllowedMethods(s.wrap(s.VariableSpecificRequest), "HEAD", "GET", "PUT", "DELETE"))
uiConfigEnabled := s.agent.config.UI != nil && s.agent.config.UI.Enabled
agentConfig := s.agent.GetConfig()
uiConfigEnabled := agentConfig.UI != nil && agentConfig.UI.Enabled
if uiEnabled && uiConfigEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
@@ -484,7 +500,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.Handle("/", s.handleRootFallthrough())
if enableDebug {
if !s.agent.config.DevMode {
if !agentConfig.DevMode {
s.logger.Warn("enable_debug is set to true. This is insecure and should not be enabled in production")
}
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
@@ -498,6 +514,54 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.registerEnterpriseHandlers()
}
// builtinAPI is a wrapper around serving the HTTP API to arbitrary listeners
// such as the Task API. It is necessary because the HTTP servers are created
// *after* the client has been initialized, so this wrapper blocks Serve
// requests from task api hooks until the HTTP server is setup and ready to
// accept from new listeners.
//
// bufconndialer provides similar functionality to consul-template except it
// satisfies the Dialer API as opposed to the Serve(Listener) API.
type builtinAPI struct {
srv *http.Server
srvReadyCh chan struct{}
}
func newBuiltinAPI() *builtinAPI {
return &builtinAPI{
srvReadyCh: make(chan struct{}),
}
}
// SetServer sets the API HTTP server for Serve to add listeners to.
//
// It must be called exactly once and will panic if called more than once.
func (b *builtinAPI) SetServer(srv *http.Server) {
select {
case <-b.srvReadyCh:
panic(fmt.Sprintf("SetServer called twice. first=%p second=%p", b.srv, srv))
default:
}
b.srv = srv
close(b.srvReadyCh)
}
// Serve the HTTP API on the listener unless the context is canceled before the
// HTTP API is ready to serve listeners. A non-nil error will always be
// returned, but http.ErrServerClosed and net.ErrClosed can likely be ignored
// as they indicate the server or listener is being shutdown.
func (b *builtinAPI) Serve(ctx context.Context, l net.Listener) error {
select {
case <-ctx.Done():
// Caller canceled context before server was ready.
return ctx.Err()
case <-b.srvReadyCh:
// Server ready for listeners! Continue on...
}
return b.srv.Serve(l)
}
// HTTPCodedError is used to provide the HTTP error code
type HTTPCodedError interface {
error
@@ -591,7 +655,7 @@ func errCodeFromHandler(err error) (int, string) {
// wrap is used to wrap functions to make them more convenient
func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
f := func(resp http.ResponseWriter, req *http.Request) {
setHeaders(resp, s.agent.config.HTTPAPIResponseHeaders)
setHeaders(resp, s.agent.GetConfig().HTTPAPIResponseHeaders)
// Invoke the handler
reqURL := req.URL.String()
start := time.Now()
@@ -673,7 +737,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
// Handler functions are responsible for setting Content-Type Header
func (s *HTTPServer) wrapNonJSON(handler func(resp http.ResponseWriter, req *http.Request) ([]byte, error)) func(resp http.ResponseWriter, req *http.Request) {
f := func(resp http.ResponseWriter, req *http.Request) {
setHeaders(resp, s.agent.config.HTTPAPIResponseHeaders)
setHeaders(resp, s.agent.GetConfig().HTTPAPIResponseHeaders)
// Invoke the handler
reqURL := req.URL.String()
start := time.Now()
@@ -817,7 +881,7 @@ func (s *HTTPServer) parseRegion(req *http.Request, r *string) {
if other := req.URL.Query().Get("region"); other != "" {
*r = other
} else if *r == "" {
*r = s.agent.config.Region
*r = s.agent.GetConfig().Region
}
}
@@ -976,3 +1040,55 @@ func wrapCORS(f func(http.ResponseWriter, *http.Request)) http.Handler {
func wrapCORSWithAllowedMethods(f func(http.ResponseWriter, *http.Request), methods ...string) http.Handler {
return allowCORSWithMethods(methods...).Handler(http.HandlerFunc(f))
}
// authMiddleware implements the http.Handler interface to enforce
// authentication for *all* requests. Even with ACLs enabled there are
// endpoints which are accessible without authenticating. This middleware is
// used for the Task API to enfoce authentication for all API access.
type authMiddleware struct {
srv *HTTPServer
wrapped http.Handler
}
func newAuthMiddleware(srv *HTTPServer, h http.Handler) http.Handler {
return &authMiddleware{
srv: srv,
wrapped: h,
}
}
func (a *authMiddleware) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
args := structs.GenericRequest{}
reply := structs.ACLWhoAmIResponse{}
if a.srv.parse(resp, req, &args.Region, &args.QueryOptions) {
// Error parsing request, 400
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(http.StatusText(http.StatusBadRequest)))
return
}
if args.AuthToken == "" {
// 401 instead of 403 since no token was present.
resp.WriteHeader(http.StatusUnauthorized)
resp.Write([]byte(http.StatusText(http.StatusUnauthorized)))
return
}
if err := a.srv.agent.RPC("ACL.WhoAmI", &args, &reply); err != nil {
a.srv.logger.Error("error authenticating built API request", "error", err, "url", req.URL, "method", req.Method)
resp.WriteHeader(500)
resp.Write([]byte("Server error authenticating request\n"))
return
}
// Require an acl token or workload identity
if reply.Identity == nil || (reply.Identity.ACLToken == nil && reply.Identity.Claims == nil) {
a.srv.logger.Debug("Failed to authenticated Task API request", "method", req.Method, "url", req.URL)
resp.WriteHeader(http.StatusForbidden)
resp.Write([]byte(http.StatusText(http.StatusForbidden)))
return
}
a.srv.logger.Trace("Authenticated request", "id", reply.Identity, "method", req.Method, "url", req.URL)
a.wrapped.ServeHTTP(resp, req)
}

View File

@@ -819,7 +819,7 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request,
queryRegion := req.URL.Query().Get("region")
requestRegion, jobRegion := regionForJob(
job, queryRegion, writeReq.Region, s.agent.config.Region,
job, queryRegion, writeReq.Region, s.agent.GetConfig().Region,
)
sJob := ApiJobToStructJob(job)

View File

@@ -25,14 +25,14 @@ func (s *HTTPServer) MetricsRequest(resp http.ResponseWriter, req *http.Request)
// Only return Prometheus formatted metrics if the user has enabled
// this functionality.
if !s.agent.config.Telemetry.PrometheusMetrics {
if !s.agent.GetConfig().Telemetry.PrometheusMetrics {
return nil, CodedError(http.StatusUnsupportedMediaType, "Prometheus is not enabled")
}
s.prometheusHandler().ServeHTTP(resp, req)
return nil, nil
}
return s.agent.InmemSink.DisplayMetrics(resp, req)
return s.agent.GetMetricsSink().DisplayMetrics(resp, req)
}
func (s *HTTPServer) prometheusHandler() http.Handler {

View File

@@ -16,7 +16,8 @@ func (s *HTTPServer) VariablesListRequest(resp http.ResponseWriter, req *http.Re
args := structs.VariablesListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
//TODO(schmichael) shouldn't we return something here?!
return nil, CodedError(http.StatusBadRequest, "failed to parse parameters")
}
var out structs.VariablesListResponse