Files
nomad/nomad/client_alloc_endpoint.go
Matt Keeler 833e240597 Upgrade to using hashicorp/go-metrics@v0.5.4 (#24856)
* Upgrade to using hashicorp/go-metrics@v0.5.4

This also requires bumping the dependencies for:

* memberlist
* serf
* raft
* raft-boltdb
* (and indirectly hashicorp/mdns due to the memberlist or serf update)

Unlike some other HashiCorp products, Nomads root module is currently expected to be consumed by others. This means that it needs to be treated more like our libraries and upgrade to hashicorp/go-metrics by utilizing its compat packages. This allows those importing the root module to control the metrics module used via build tags.
2025-01-31 15:22:00 -05:00

634 lines
18 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package nomad
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/hashicorp/go-hclog"
metrics "github.com/hashicorp/go-metrics/compat"
"github.com/hashicorp/go-msgpack/v2/codec"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
)
// ClientAllocations is used to forward RPC requests to the targeted Nomad client's
// Allocation endpoint.
type ClientAllocations struct {
srv *Server
logger hclog.Logger
}
func NewClientAllocationsEndpoint(srv *Server) *ClientAllocations {
return &ClientAllocations{srv: srv, logger: srv.logger.Named("client_allocs")}
}
func (a *ClientAllocations) register() {
a.srv.streamingRpcs.Register("Allocations.Exec", a.exec)
}
// GarbageCollectAll is used to garbage collect all allocations on a client.
func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest, reply *structs.GenericResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hop
// in the forwarding chain.
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.GarbageCollectAll", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricWrite, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "garbage_collect_all"}, time.Now())
// Check node read permissions
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}
// Verify the arguments.
if args.NodeID == "" {
return errors.New("missing NodeID")
}
// Make sure Node is valid and new enough to support RPC
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
_, err = getNodeForRpc(snap, args.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(args.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, args.NodeID, "ClientAllocations.GarbageCollectAll", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.GarbageCollectAll", args, reply)
}
// Signal is used to send a signal to an allocation on a client.
func (a *ClientAllocations) Signal(args *structs.AllocSignalRequest, reply *structs.GenericResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hope
// in the forwarding chain.
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.Signal", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricWrite, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "signal"}, time.Now())
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing AllocID")
}
// Find the allocation
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check namespace alloc-lifecycle permission.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
return structs.ErrPermissionDenied
}
// Make sure Node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Signal", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.Signal", args, reply)
}
func (a *ClientAllocations) SetPauseState(args *structs.AllocPauseRequest, reply *structs.GenericResponse) error {
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.SetPauseState", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricWrite, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "pause_set"}, time.Now())
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing AllocID")
}
// Find the allocation.
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check namespace submit-job permission.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
// Make sure the node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.SetPauseState", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.SetPauseState", args, reply)
}
func (a *ClientAllocations) GetPauseState(args *structs.AllocGetPauseStateRequest, reply *structs.AllocGetPauseStateResponse) error {
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.GetPauseState", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricRead, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "pause_get"}, time.Now())
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing AllocID")
}
// Find the allocation.
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check namespace read-job permission.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Make sure the node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.GetPauseState", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.GetPauseState", args, reply)
}
// GarbageCollect is used to garbage collect an allocation on a client.
func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, reply *structs.GenericResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hop
// in the forwarding chain.
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.GarbageCollect", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricWrite, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "garbage_collect"}, time.Now())
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing AllocID")
}
// Find the allocation
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check namespace submit-job permission.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
// Make sure Node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.GarbageCollect", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.GarbageCollect", args, reply)
}
// Restart is used to trigger a restart of an allocation or a subtask on a client.
func (a *ClientAllocations) Restart(args *structs.AllocRestartRequest, reply *structs.GenericResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hop
// in the forwarding chain.
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.Restart", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricWrite, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "restart"}, time.Now())
// Find the allocation
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check for namespace alloc-lifecycle permissions.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
return structs.ErrPermissionDenied
}
// Make sure Node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Restart", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.Restart", args, reply)
}
// Stats is used to collect allocation statistics
func (a *ClientAllocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hop
// in the forwarding chain.
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.Stats", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricRead, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "stats"}, time.Now())
// Find the allocation
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check for namespace read-job permissions.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Make sure Node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Stats", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.Stats", args, reply)
}
// Checks is the server implementation of the allocation checks RPC. The
// ultimate response is provided by the node running the allocation. This RPC
// is needed to handle queries which hit the server agent API directly, or via
// another node which is not running the allocation.
func (a *ClientAllocations) Checks(args *cstructs.AllocChecksRequest, reply *cstructs.AllocChecksResponse) error {
// We only allow stale reads since the only potentially stale information
// is the Node registration and the cost is fairly high for adding another
// hop in the forwarding chain.
args.QueryOptions.AllowStale = true
authErr := a.srv.Authenticate(nil, args)
// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.Checks", args, args, reply); done {
return err
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricRead, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "checks"}, time.Now())
// Grab the state snapshot, as we need this to perform lookups for a number
// of objects, all things being well.
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
// Get the full allocation object, so we have information such as the
// namespace and node ID.
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}
// Check for namespace read-job permissions.
if aclObj, err := a.srv.ResolveACL(args); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Make sure Node is valid and new enough to support RPC.
if _, err = getNodeForRpc(snap, alloc.NodeID); err != nil {
return err
}
// Get the connection to the client.
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Checks", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "Allocations.Checks", args, reply)
}
// exec is used to execute command in a running task
func (a *ClientAllocations) exec(conn io.ReadWriteCloser) {
defer conn.Close()
defer metrics.MeasureSince([]string{"nomad", "alloc", "exec"}, time.Now())
// Decode the arguments
var args cstructs.AllocExecRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
authErr := a.srv.Authenticate(nil, &args)
// Check if we need to forward to a different region
if r := args.RequestRegion(); r != a.srv.Region() {
forwardRegionStreamingRpc(a.srv, conn, encoder, &args, "Allocations.Exec",
args.AllocID, &args.QueryOptions)
return
}
a.srv.MeasureRPCRate("client_allocations", structs.RateMetricWrite, &args)
if authErr != nil {
handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
// Verify the arguments.
if args.AllocID == "" {
handleStreamResultError(errors.New("missing AllocID"), pointer.Of(int64(400)), encoder)
return
}
// Retrieve the allocation
snap, err := a.srv.State().Snapshot()
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
alloc, err := getAlloc(snap, args.AllocID)
if structs.IsErrUnknownAllocation(err) {
handleStreamResultError(err, pointer.Of(int64(404)), encoder)
return
}
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
// Check node read permissions
if aclObj, err := a.srv.ResolveACL(&args); err != nil {
handleStreamResultError(err, nil, encoder)
return
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityAllocExec) {
// client ultimately checks if AllocNodeExec is required
handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
if alloc.ClientTerminalStatus() {
handleStreamResultError(fmt.Errorf("exec not possible, client status of allocation %s is %s", alloc.ID, alloc.ClientStatus),
pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
// Handle job ID if requested.
if args.JobID != "" {
// Verify job exists.
job, err := snap.JobByID(nil, args.Namespace, args.JobID)
if err != nil {
handleStreamResultError(err,
pointer.Of(int64(http.StatusInternalServerError)), encoder)
return
}
if job == nil {
handleStreamResultError(
fmt.Errorf("job %s not found in namespace %s", args.JobID, args.Namespace),
pointer.Of(int64(http.StatusNotFound)), encoder)
return
}
// Verify requested allocation belongs to the job.
if args.JobID != alloc.JobID {
handleStreamResultError(
fmt.Errorf("job %s does not have allocation %s", args.JobID, alloc.ID),
pointer.Of(int64(http.StatusBadRequest)), encoder,
)
}
}
nodeID := alloc.NodeID
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
handleStreamResultError(err, pointer.Of(int64(400)), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
handleStreamResultError(err, pointer.Of(int64(400)), encoder)
return
}
// Get the connection to the client either by forwarding to another server
// or creating a direct stream
var clientConn net.Conn
state, ok := a.srv.getNodeConn(nodeID)
if !ok {
// Determine the Server that has a connection to the node.
srv, err := a.srv.serverWithNodeConn(nodeID, a.srv.Region())
if err != nil {
var code *int64
if structs.IsErrNoNodeConn(err) {
code = pointer.Of(int64(404))
}
handleStreamResultError(err, code, encoder)
return
}
// Get a connection to the server
conn, err := a.srv.streamingRpc(srv, "Allocations.Exec")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "Allocations.Exec")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
}
defer clientConn.Close()
// Send the request.
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, clientConn)
}