diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 95dd007f0..28a2b14db 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -5,10 +5,9 @@ import ( "runtime" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -536,6 +535,6 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri proposed = append(proposed, plan.NodeAllocation[nodeID]...) // Check if these allocations fit - fit, reason, _, err := structs.AllocsFit(node, proposed, nil) + fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true) return fit, reason, err } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 1b5142aa9..76ed0ad89 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -640,6 +640,65 @@ func TestPlanApply_EvalNodePlan_NodeFull(t *testing.T) { } } +// Test that we detect device oversubscription +func TestPlanApply_EvalNodePlan_NodeFull_Device(t *testing.T) { + t.Parallel() + alloc := mock.Alloc() + state := testStateStore(t) + node := mock.NvidiaNode() + node.ReservedResources = nil + + nvidia0 := node.NodeResources.Devices[0].Instances[0].ID + + // Have the allocation use a Nvidia device + alloc.NodeID = node.ID + alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidia0}, + }, + } + + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) + state.UpsertNode(1000, node) + state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + + // Alloc2 tries to use the same device + alloc2 := mock.Alloc() + alloc2.AllocatedResources.Tasks["web"].Networks = nil + alloc2.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidia0}, + }, + } + alloc2.NodeID = node.ID + state.UpsertJobSummary(1200, mock.JobSummary(alloc2.JobID)) + + snap, _ := state.Snapshot() + plan := &structs.Plan{ + Job: alloc.Job, + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: {alloc2}, + }, + } + + fit, reason, err := evaluateNodePlan(snap, plan, node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if fit { + t.Fatalf("bad") + } + if reason != "device oversubscribed" { + t.Fatalf("bad: %q", reason) + } +} + func TestPlanApply_EvalNodePlan_UpdateExisting(t *testing.T) { t.Parallel() alloc := mock.Alloc() diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go index 6f179f42c..5ee537e04 100644 --- a/nomad/structs/devices.go +++ b/nomad/structs/devices.go @@ -18,6 +18,42 @@ type DeviceAccounterInstance struct { Instances map[string]int } +// NewDeviceAccounter returns a new device accounter. The node is used to +// populate the set of available devices based on what healthy device instances +// exist on the node. +func NewDeviceAccounter(n *Node) *DeviceAccounter { + numDevices := 0 + var devices []*NodeDeviceResource + + // COMPAT(0.11): Remove in 0.11 + if n.NodeResources != nil { + numDevices = len(n.NodeResources.Devices) + devices = n.NodeResources.Devices + } + + d := &DeviceAccounter{ + Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices), + } + + for _, dev := range devices { + id := *dev.ID() + d.Devices[id] = &DeviceAccounterInstance{ + Device: dev, + Instances: make(map[string]int, len(dev.Instances)), + } + for _, instance := range dev.Instances { + // Skip unhealthy devices as they aren't allocatable + if !instance.Healthy { + continue + } + + d.Devices[id].Instances[instance.ID] = 0 + } + } + + return d +} + // AddAllocs takes a set of allocations and internally marks which devices are // used. If a device is used more than once by the set of passed allocations, // the collision will be returned as true. @@ -92,39 +128,3 @@ func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision b return } - -// NewDeviceAccounter returns a new device accounter. The node is used to -// populate the set of available devices based on what healthy device instances -// exist on the node. -func NewDeviceAccounter(n *Node) *DeviceAccounter { - numDevices := 0 - var devices []*NodeDeviceResource - - // COMPAT(0.11): Remove in 0.11 - if n.NodeResources != nil { - numDevices = len(n.NodeResources.Devices) - devices = n.NodeResources.Devices - } - - d := &DeviceAccounter{ - Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices), - } - - for _, dev := range devices { - id := *dev.ID() - d.Devices[id] = &DeviceAccounterInstance{ - Device: dev, - Instances: make(map[string]int, len(dev.Instances)), - } - for _, instance := range dev.Instances { - // Skip unhealthy devices as they aren't allocatable - if !instance.Healthy { - continue - } - - d.Devices[id].Instances[instance.ID] = 0 - } - } - - return d -} diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 7493596f8..f79cb6c06 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -10,11 +10,10 @@ import ( "strconv" "strings" - "golang.org/x/crypto/blake2b" - multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/nomad/acl" + "golang.org/x/crypto/blake2b" ) // MergeMultierrorWarnings takes job warnings and canonicalize warnings and @@ -98,8 +97,9 @@ func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allo // AllocsFit checks if a given set of allocations will fit on a node. // The netIdx can optionally be provided if its already been computed. // If the netIdx is provided, it is assumed that the client has already -// ensured there are no collisions. -func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, string, *ComparableResources, error) { +// ensured there are no collisions. If checkDevices is set to true, we check if +// there is a device oversubscription. +func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevices bool) (bool, string, *ComparableResources, error) { // Compute the utilization from zero used := new(ComparableResources) @@ -136,6 +136,14 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st return false, "bandwidth exceeded", used, nil } + // Check devices + if checkDevices { + accounter := NewDeviceAccounter(node) + if accounter.AddAllocs(allocs) { + return false, "device oversubscribed", used, nil + } + } + // Allocations fit! return true, "", used, nil } diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 01571ba34..db81c67b5 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -116,7 +116,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) { } // Should fit one allocation - fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil, false) if err != nil { t.Fatalf("err: %v", err) } @@ -125,7 +125,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) { } // Should not fit second allocation - fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) if err != nil { t.Fatalf("err: %v", err) } @@ -186,7 +186,7 @@ func TestAllocsFit_Old(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -195,7 +195,7 @@ func TestAllocsFit_Old(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) // Should not fit second allocation - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) require.NoError(err) require.False(fit) @@ -256,7 +256,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -267,7 +267,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) { // Should fit second allocation since it is terminal a2 := a1.Copy() a2.DesiredStatus = AllocDesiredStatusStop - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit) @@ -341,7 +341,7 @@ func TestAllocsFit(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -350,7 +350,7 @@ func TestAllocsFit(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) // Should not fit second allocation - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) require.NoError(err) require.False(fit) @@ -425,7 +425,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -436,7 +436,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { // Should fit second allocation since it is terminal a2 := a1.Copy() a2.DesiredStatus = AllocDesiredStatusStop - fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil) + fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit, dim) @@ -445,6 +445,72 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) } +// Tests that AllocsFit detects device collisions +func TestAllocsFit_Devices(t *testing.T) { + require := require.New(t) + + n := MockNvidiaNode() + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Devices: []*AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, + }, + }, + } + a2 := a1.Copy() + a2.AllocatedResources.Tasks["web"] = &AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Devices: []*AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, // Use the same ID + }, + }, + } + + // Should fit one allocation + fit, _, _, err := AllocsFit(n, []*Allocation{a1}, nil, true) + require.NoError(err) + require.True(fit) + + // Should not fit second allocation + fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a1}, nil, true) + require.NoError(err) + require.False(fit) + require.Equal("device oversubscribed", msg) + + // Should not fit second allocation but won't detect since we disabled + // devices + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) + require.NoError(err) + require.True(fit) +} + // COMPAT(0.11): Remove in 0.11 func TestScoreFit_Old(t *testing.T) { node := &Node{} diff --git a/scheduler/rank.go b/scheduler/rank.go index e313f1d6b..55d45c995 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -313,8 +313,8 @@ OUTER: // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{AllocatedResources: total}) - // Check if these allocations fit - fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx) + // Check if these allocations fit, if they do not, simply skip this node + fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx, false) netIdx.Release() if !fit { // Skip the node if evictions are not enabled