mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
allocs fit checks if devices get oversubscribed
This commit is contained in:
@@ -5,10 +5,9 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
@@ -536,6 +535,6 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
|
||||
proposed = append(proposed, plan.NodeAllocation[nodeID]...)
|
||||
|
||||
// Check if these allocations fit
|
||||
fit, reason, _, err := structs.AllocsFit(node, proposed, nil)
|
||||
fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true)
|
||||
return fit, reason, err
|
||||
}
|
||||
|
||||
@@ -640,6 +640,65 @@ func TestPlanApply_EvalNodePlan_NodeFull(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test that we detect device oversubscription
|
||||
func TestPlanApply_EvalNodePlan_NodeFull_Device(t *testing.T) {
|
||||
t.Parallel()
|
||||
alloc := mock.Alloc()
|
||||
state := testStateStore(t)
|
||||
node := mock.NvidiaNode()
|
||||
node.ReservedResources = nil
|
||||
|
||||
nvidia0 := node.NodeResources.Devices[0].Instances[0].ID
|
||||
|
||||
// Have the allocation use a Nvidia device
|
||||
alloc.NodeID = node.ID
|
||||
alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{
|
||||
{
|
||||
Type: "gpu",
|
||||
Vendor: "nvidia",
|
||||
Name: "1080ti",
|
||||
DeviceIDs: []string{nvidia0},
|
||||
},
|
||||
}
|
||||
|
||||
state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
|
||||
state.UpsertNode(1000, node)
|
||||
state.UpsertAllocs(1001, []*structs.Allocation{alloc})
|
||||
|
||||
// Alloc2 tries to use the same device
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.AllocatedResources.Tasks["web"].Networks = nil
|
||||
alloc2.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{
|
||||
{
|
||||
Type: "gpu",
|
||||
Vendor: "nvidia",
|
||||
Name: "1080ti",
|
||||
DeviceIDs: []string{nvidia0},
|
||||
},
|
||||
}
|
||||
alloc2.NodeID = node.ID
|
||||
state.UpsertJobSummary(1200, mock.JobSummary(alloc2.JobID))
|
||||
|
||||
snap, _ := state.Snapshot()
|
||||
plan := &structs.Plan{
|
||||
Job: alloc.Job,
|
||||
NodeAllocation: map[string][]*structs.Allocation{
|
||||
node.ID: {alloc2},
|
||||
},
|
||||
}
|
||||
|
||||
fit, reason, err := evaluateNodePlan(snap, plan, node.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if fit {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if reason != "device oversubscribed" {
|
||||
t.Fatalf("bad: %q", reason)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlanApply_EvalNodePlan_UpdateExisting(t *testing.T) {
|
||||
t.Parallel()
|
||||
alloc := mock.Alloc()
|
||||
|
||||
@@ -18,6 +18,42 @@ type DeviceAccounterInstance struct {
|
||||
Instances map[string]int
|
||||
}
|
||||
|
||||
// NewDeviceAccounter returns a new device accounter. The node is used to
|
||||
// populate the set of available devices based on what healthy device instances
|
||||
// exist on the node.
|
||||
func NewDeviceAccounter(n *Node) *DeviceAccounter {
|
||||
numDevices := 0
|
||||
var devices []*NodeDeviceResource
|
||||
|
||||
// COMPAT(0.11): Remove in 0.11
|
||||
if n.NodeResources != nil {
|
||||
numDevices = len(n.NodeResources.Devices)
|
||||
devices = n.NodeResources.Devices
|
||||
}
|
||||
|
||||
d := &DeviceAccounter{
|
||||
Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices),
|
||||
}
|
||||
|
||||
for _, dev := range devices {
|
||||
id := *dev.ID()
|
||||
d.Devices[id] = &DeviceAccounterInstance{
|
||||
Device: dev,
|
||||
Instances: make(map[string]int, len(dev.Instances)),
|
||||
}
|
||||
for _, instance := range dev.Instances {
|
||||
// Skip unhealthy devices as they aren't allocatable
|
||||
if !instance.Healthy {
|
||||
continue
|
||||
}
|
||||
|
||||
d.Devices[id].Instances[instance.ID] = 0
|
||||
}
|
||||
}
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
// AddAllocs takes a set of allocations and internally marks which devices are
|
||||
// used. If a device is used more than once by the set of passed allocations,
|
||||
// the collision will be returned as true.
|
||||
@@ -92,39 +128,3 @@ func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision b
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// NewDeviceAccounter returns a new device accounter. The node is used to
|
||||
// populate the set of available devices based on what healthy device instances
|
||||
// exist on the node.
|
||||
func NewDeviceAccounter(n *Node) *DeviceAccounter {
|
||||
numDevices := 0
|
||||
var devices []*NodeDeviceResource
|
||||
|
||||
// COMPAT(0.11): Remove in 0.11
|
||||
if n.NodeResources != nil {
|
||||
numDevices = len(n.NodeResources.Devices)
|
||||
devices = n.NodeResources.Devices
|
||||
}
|
||||
|
||||
d := &DeviceAccounter{
|
||||
Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices),
|
||||
}
|
||||
|
||||
for _, dev := range devices {
|
||||
id := *dev.ID()
|
||||
d.Devices[id] = &DeviceAccounterInstance{
|
||||
Device: dev,
|
||||
Instances: make(map[string]int, len(dev.Instances)),
|
||||
}
|
||||
for _, instance := range dev.Instances {
|
||||
// Skip unhealthy devices as they aren't allocatable
|
||||
if !instance.Healthy {
|
||||
continue
|
||||
}
|
||||
|
||||
d.Devices[id].Instances[instance.ID] = 0
|
||||
}
|
||||
}
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
@@ -10,11 +10,10 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/crypto/blake2b"
|
||||
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
)
|
||||
|
||||
// MergeMultierrorWarnings takes job warnings and canonicalize warnings and
|
||||
@@ -98,8 +97,9 @@ func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allo
|
||||
// AllocsFit checks if a given set of allocations will fit on a node.
|
||||
// The netIdx can optionally be provided if its already been computed.
|
||||
// If the netIdx is provided, it is assumed that the client has already
|
||||
// ensured there are no collisions.
|
||||
func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, string, *ComparableResources, error) {
|
||||
// ensured there are no collisions. If checkDevices is set to true, we check if
|
||||
// there is a device oversubscription.
|
||||
func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevices bool) (bool, string, *ComparableResources, error) {
|
||||
// Compute the utilization from zero
|
||||
used := new(ComparableResources)
|
||||
|
||||
@@ -136,6 +136,14 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st
|
||||
return false, "bandwidth exceeded", used, nil
|
||||
}
|
||||
|
||||
// Check devices
|
||||
if checkDevices {
|
||||
accounter := NewDeviceAccounter(node)
|
||||
if accounter.AddAllocs(allocs) {
|
||||
return false, "device oversubscribed", used, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Allocations fit!
|
||||
return true, "", used, nil
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should fit one allocation
|
||||
fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil)
|
||||
fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil, false)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -125,7 +125,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should not fit second allocation
|
||||
fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil)
|
||||
fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -186,7 +186,7 @@ func TestAllocsFit_Old(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should fit one allocation
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil)
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
|
||||
@@ -195,7 +195,7 @@ func TestAllocsFit_Old(t *testing.T) {
|
||||
require.EqualValues(2048, used.Flattened.Memory.MemoryMB)
|
||||
|
||||
// Should not fit second allocation
|
||||
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil)
|
||||
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.False(fit)
|
||||
|
||||
@@ -256,7 +256,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should fit one allocation
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil)
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
|
||||
@@ -267,7 +267,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) {
|
||||
// Should fit second allocation since it is terminal
|
||||
a2 := a1.Copy()
|
||||
a2.DesiredStatus = AllocDesiredStatusStop
|
||||
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil)
|
||||
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
|
||||
@@ -341,7 +341,7 @@ func TestAllocsFit(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should fit one allocation
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil)
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
|
||||
@@ -350,7 +350,7 @@ func TestAllocsFit(t *testing.T) {
|
||||
require.EqualValues(2048, used.Flattened.Memory.MemoryMB)
|
||||
|
||||
// Should not fit second allocation
|
||||
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil)
|
||||
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.False(fit)
|
||||
|
||||
@@ -425,7 +425,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should fit one allocation
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil)
|
||||
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
|
||||
@@ -436,7 +436,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) {
|
||||
// Should fit second allocation since it is terminal
|
||||
a2 := a1.Copy()
|
||||
a2.DesiredStatus = AllocDesiredStatusStop
|
||||
fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil)
|
||||
fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit, dim)
|
||||
|
||||
@@ -445,6 +445,72 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) {
|
||||
require.EqualValues(2048, used.Flattened.Memory.MemoryMB)
|
||||
}
|
||||
|
||||
// Tests that AllocsFit detects device collisions
|
||||
func TestAllocsFit_Devices(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
n := MockNvidiaNode()
|
||||
a1 := &Allocation{
|
||||
AllocatedResources: &AllocatedResources{
|
||||
Tasks: map[string]*AllocatedTaskResources{
|
||||
"web": {
|
||||
Cpu: AllocatedCpuResources{
|
||||
CpuShares: 1000,
|
||||
},
|
||||
Memory: AllocatedMemoryResources{
|
||||
MemoryMB: 1024,
|
||||
},
|
||||
Devices: []*AllocatedDeviceResource{
|
||||
{
|
||||
Type: "gpu",
|
||||
Vendor: "nvidia",
|
||||
Name: "1080ti",
|
||||
DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Shared: AllocatedSharedResources{
|
||||
DiskMB: 5000,
|
||||
},
|
||||
},
|
||||
}
|
||||
a2 := a1.Copy()
|
||||
a2.AllocatedResources.Tasks["web"] = &AllocatedTaskResources{
|
||||
Cpu: AllocatedCpuResources{
|
||||
CpuShares: 1000,
|
||||
},
|
||||
Memory: AllocatedMemoryResources{
|
||||
MemoryMB: 1024,
|
||||
},
|
||||
Devices: []*AllocatedDeviceResource{
|
||||
{
|
||||
Type: "gpu",
|
||||
Vendor: "nvidia",
|
||||
Name: "1080ti",
|
||||
DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, // Use the same ID
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Should fit one allocation
|
||||
fit, _, _, err := AllocsFit(n, []*Allocation{a1}, nil, true)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
|
||||
// Should not fit second allocation
|
||||
fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a1}, nil, true)
|
||||
require.NoError(err)
|
||||
require.False(fit)
|
||||
require.Equal("device oversubscribed", msg)
|
||||
|
||||
// Should not fit second allocation but won't detect since we disabled
|
||||
// devices
|
||||
fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
|
||||
require.NoError(err)
|
||||
require.True(fit)
|
||||
}
|
||||
|
||||
// COMPAT(0.11): Remove in 0.11
|
||||
func TestScoreFit_Old(t *testing.T) {
|
||||
node := &Node{}
|
||||
|
||||
@@ -313,8 +313,8 @@ OUTER:
|
||||
// Add the resources we are trying to fit
|
||||
proposed = append(proposed, &structs.Allocation{AllocatedResources: total})
|
||||
|
||||
// Check if these allocations fit
|
||||
fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx)
|
||||
// Check if these allocations fit, if they do not, simply skip this node
|
||||
fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx, false)
|
||||
netIdx.Release()
|
||||
if !fit {
|
||||
// Skip the node if evictions are not enabled
|
||||
|
||||
Reference in New Issue
Block a user