From 9dfc27915c6b173b9bb382688d17bf39cdb8584d Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 14 Nov 2018 21:23:38 -0600 Subject: [PATCH 01/10] Initial implementation of device preemption --- nomad/structs/devices.go | 11 ++ scheduler/preemption.go | 51 ++++++++- scheduler/preemption_test.go | 195 +++++++++++++++++++++++++++++++++++ scheduler/rank.go | 34 +++++- 4 files changed, 287 insertions(+), 4 deletions(-) diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go index 9ad363454..52cc21cae 100644 --- a/nomad/structs/devices.go +++ b/nomad/structs/devices.go @@ -129,3 +129,14 @@ func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision b return } + +// FreeCount returns the number of free device instances +func (i *DeviceAccounterInstance) FreeCount() int { + count := 0 + for _, c := range i.Instances { + if c == 0 { + count++ + } + } + return count +} diff --git a/scheduler/preemption.go b/scheduler/preemption.go index f36c56cbc..9e9f00fff 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -113,9 +113,12 @@ type Preemptor struct { // currentAllocs is the candidate set used to find preemptible allocations currentAllocs []*structs.Allocation + + // ctx is the context from the scheduler stack + ctx Context } -func NewPreemptor(jobPriority int) *Preemptor { +func NewPreemptor(jobPriority int, ctx Context) *Preemptor { return &Preemptor{ currentPreemptions: make(map[structs.NamespacedID]map[string]int), jobPriority: jobPriority, @@ -435,6 +438,52 @@ OUTER: return filteredBestAllocs } +// PreemptForDevice tries to find allocations to preempt to meet devices needed +// This is called once per task when assigning devices to the task +func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *deviceAllocator) []*structs.Allocation { + // First group allocations by priority + allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, p.currentAllocs) + neededCount := ask.Count + preemptedCount := 0 + var preemptedAllocs []*structs.Allocation + + for _, grpAllocs := range allocsByPriority { + for _, alloc := range grpAllocs.allocs { + for _, tr := range alloc.AllocatedResources.Tasks { + if len(tr.Devices) > 0 { + // Go through each assigned device group + for _, device := range tr.Devices { + devID := device.ID() + + // Look up the device instance from the device allocator + devInst := devAlloc.Devices[*devID] + + if devInst == nil { + continue + } + + // Check if the device matches the ask + if !nodeDeviceMatches(p.ctx, devInst.Device, ask) { + continue + } + + // Add to preemption list because this device matches + preemptedCount += len(device.DeviceIDs) + preemptedAllocs = append(preemptedAllocs, alloc) + + // Check if we met needed count + if preemptedCount+devInst.FreeCount() >= int(neededCount) { + return preemptedAllocs + } + } + } + } + } + } + + return nil +} + // basicResourceDistance computes a distance using a coordinate system. It compares resource fields like CPU/Memory and Disk. // Values emitted are in the range [0, maxFloat] func basicResourceDistance(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources) float64 { diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index f2d83f1f4..fd4c12696 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/stretchr/testify/require" ) @@ -162,6 +163,8 @@ func TestPreemption(t *testing.T) { // Create some persistent alloc ids to use in test cases allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} + deviceIDs := []string{"dev1", "dev2", "dev3", "dev4"} + defaultNodeResources := &structs.NodeResources{ Cpu: structs.NodeCpuResources{ CpuShares: 4000, @@ -179,6 +182,55 @@ func TestPreemption(t *testing.T) { MBits: 1000, }, }, + Devices: []*structs.NodeDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(3584, ""), + "graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), + }, + Instances: []*structs.NodeDevice{ + { + ID: deviceIDs[0], + Healthy: true, + }, + { + ID: deviceIDs[1], + Healthy: true, + }, + { + ID: deviceIDs[2], + Healthy: true, + }, + { + ID: deviceIDs[3], + Healthy: true, + }, + }, + }, + { + Type: "fpga", + Vendor: "intel", + Name: "F100", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), + }, + Instances: []*structs.NodeDevice{ + { + ID: "fpga1", + Healthy: true, + }, + { + ID: "fpga2", + Healthy: false, + }, + }, + }, + }, } reservedNodeResources := &structs.NodeReservedResources{ @@ -827,6 +879,128 @@ func TestPreemption(t *testing.T) { allocIDs[1]: {}, }, }, + { + desc: "Preemption with one device instance per alloc", + // Add allocations that use two device instances + currentAllocations: []*structs.Allocation{ + createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 500, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[0]}, + }), + createAllocWithDevice(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[1]}, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + DiskMB: 4 * 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu/1080ti", + Count: 4, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[0]: {}, + allocIDs[1]: {}, + }, + }, + { + desc: "Preemption multiple devices used", + currentAllocations: []*structs.Allocation{ + createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 500, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[0], deviceIDs[1], deviceIDs[2], deviceIDs[3]}, + }), + createAllocWithDevice(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + DeviceIDs: []string{"fpga1"}, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + DiskMB: 4 * 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "gpu", + Count: 4, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[0]: {}, + }, + }, + { + desc: "Device preemption not possible due to more instances needed than available", + currentAllocations: []*structs.Allocation{ + createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 500, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[0], deviceIDs[1], deviceIDs[2], deviceIDs[3]}, + }), + createAllocWithDevice(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + DeviceIDs: []string{"fpga1"}, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + DiskMB: 4 * 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "gpu", + Count: 6, + }, + }, + }, + }, // This test case exercises the code path for a final filtering step that tries to // minimize the number of preemptible allocations { @@ -946,6 +1120,10 @@ func TestPreemption(t *testing.T) { // helper method to create allocations with given jobs and resources func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation { + return createAllocWithDevice(id, job, resource, nil) +} + +func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resources, allocatedDevices *structs.AllocatedDeviceResource) *structs.Allocation { alloc := &structs.Allocation{ ID: id, Job: job, @@ -959,6 +1137,23 @@ func createAlloc(id string, job *structs.Job, resource *structs.Resources) *stru DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusRunning, TaskGroup: "web", + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": &structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: int64(resource.CPU), + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: int64(resource.MemoryMB), + }, + Networks: resource.Networks, + }, + }, + }, + } + + if allocatedDevices != nil { + alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{allocatedDevices} } return alloc } diff --git a/scheduler/rank.go b/scheduler/rank.go index 7d2b4ad4e..d892d317f 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -211,7 +211,7 @@ OUTER: var allocsToPreempt []*structs.Allocation // Initialize preemptor with node - preemptor := NewPreemptor(iter.priority) + preemptor := NewPreemptor(iter.priority, iter.ctx) preemptor.SetNode(option.Node) // Count the number of existing preemptions @@ -285,8 +285,36 @@ OUTER: for _, req := range task.Resources.Devices { offer, sumAffinities, err := devAllocator.AssignDevice(req) if offer == nil { - iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) - continue OUTER + // If eviction is not enabled, mark this node as exhausted and continue + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) + continue OUTER + } + + // Attempt preemption + preemptor.SetCandidates(proposed) + devicePreemptions := preemptor.PreemptForDevice(req, devAllocator) + + if devicePreemptions == nil { + iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unable to meet device resource need after attempting preemption:%v", req)) + netIdx.Release() + continue OUTER + } + allocsToPreempt = append(allocsToPreempt, devicePreemptions...) + + // First subtract out preempted allocations + proposed = structs.RemoveAllocs(proposed, allocsToPreempt) + + // Reset the device allocator with new set of proposed allocs + devAllocator := newDeviceAllocator(iter.ctx, option.Node) + devAllocator.AddAllocs(proposed) + + // Try offer again + offer, sumAffinities, err = devAllocator.AssignDevice(req) + if offer == nil { + iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unexpected error, unable to create device offer after preempting:%v", err)) + continue OUTER + } } // Store the resource From 813143fee5c8a0261867e4c33597b7695b8613f5 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 15 Nov 2018 12:27:32 -0600 Subject: [PATCH 02/10] fix linting --- scheduler/preemption_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index fd4c12696..d2e10f3bf 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -1139,7 +1139,7 @@ func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resour TaskGroup: "web", AllocatedResources: &structs.AllocatedResources{ Tasks: map[string]*structs.AllocatedTaskResources{ - "web": &structs.AllocatedTaskResources{ + "web": { Cpu: structs.AllocatedCpuResources{ CpuShares: int64(resource.CPU), }, From 7f2826097de9bea9f241403695936e85b694957a Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 16 Nov 2018 20:32:10 -0600 Subject: [PATCH 03/10] Fix preemption logic bug, need to group allocations by device first. This ensures that the set of allocations chosen for preemption all share the same device where ID is --- scheduler/preemption.go | 113 +++++++++++++++++++++++++---------- scheduler/preemption_test.go | 113 ++++++++++++++++++++++++++++++++++- 2 files changed, 193 insertions(+), 33 deletions(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 9e9f00fff..6d6b929c6 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -123,6 +123,7 @@ func NewPreemptor(jobPriority int, ctx Context) *Preemptor { currentPreemptions: make(map[structs.NamespacedID]map[string]int), jobPriority: jobPriority, allocDetails: make(map[string]*allocInfo), + ctx: ctx, } } @@ -438,45 +439,95 @@ OUTER: return filteredBestAllocs } +// allocDeviceGroup represents a group of allocs that share a device +type allocDeviceGroup struct { + allocs []*structs.Allocation + + // deviceInstances tracks the number of instances used per alloc + deviceInstances map[string]int +} + +func newAllocDeviceGroup() *allocDeviceGroup { + return &allocDeviceGroup{ + deviceInstances: make(map[string]int), + } +} + // PreemptForDevice tries to find allocations to preempt to meet devices needed // This is called once per task when assigning devices to the task func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *deviceAllocator) []*structs.Allocation { - // First group allocations by priority - allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, p.currentAllocs) - neededCount := ask.Count - preemptedCount := 0 - var preemptedAllocs []*structs.Allocation - for _, grpAllocs := range allocsByPriority { - for _, alloc := range grpAllocs.allocs { - for _, tr := range alloc.AllocatedResources.Tasks { - if len(tr.Devices) > 0 { - // Go through each assigned device group - for _, device := range tr.Devices { - devID := device.ID() + // Group allocations by device, tracking the number of + // instances used in each device by alloc id + deviceToAllocs := make(map[structs.DeviceIdTuple]*allocDeviceGroup) + for _, alloc := range p.currentAllocs { + for _, tr := range alloc.AllocatedResources.Tasks { + // Ignore allocs that don't use devices + if len(tr.Devices) == 0 { + continue + } - // Look up the device instance from the device allocator - devInst := devAlloc.Devices[*devID] + // Go through each assigned device group + for _, device := range tr.Devices { + devID := device.ID() - if devInst == nil { - continue - } + // Look up the device instance from the device allocator + deviceIdTuple := *devID + devInst := devAlloc.Devices[deviceIdTuple] - // Check if the device matches the ask - if !nodeDeviceMatches(p.ctx, devInst.Device, ask) { - continue - } - - // Add to preemption list because this device matches - preemptedCount += len(device.DeviceIDs) - preemptedAllocs = append(preemptedAllocs, alloc) - - // Check if we met needed count - if preemptedCount+devInst.FreeCount() >= int(neededCount) { - return preemptedAllocs - } - } + if devInst == nil { + continue } + + // Ignore if the device doesn't match the ask + if !nodeDeviceMatches(p.ctx, devInst.Device, ask) { + continue + } + + // Store both the alloc and the number of instances used + // in our tracking map + allocDeviceGrp := deviceToAllocs[deviceIdTuple] + if allocDeviceGrp == nil { + allocDeviceGrp = newAllocDeviceGroup() + deviceToAllocs[deviceIdTuple] = allocDeviceGrp + } + allocDeviceGrp.allocs = append(allocDeviceGrp.allocs, alloc) + devCount := allocDeviceGrp.deviceInstances[alloc.ID] + devCount += len(device.DeviceIDs) + allocDeviceGrp.deviceInstances[alloc.ID] = devCount + } + } + } + + neededCount := ask.Count + + // Examine matching allocs by device + for deviceIDTuple, allocsGrp := range deviceToAllocs { + + // First group and sort allocations using this device by priority + allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, allocsGrp.allocs) + + // Reset preempted count for this device + preemptedCount := 0 + + // Initialize slice of preempted allocations + var preemptedAllocs []*structs.Allocation + + for _, grpAllocs := range allocsByPriority { + for _, alloc := range grpAllocs.allocs { + + // Look up the device instance from the device allocator + devInst := devAlloc.Devices[deviceIDTuple] + + // Add to preemption list because this device matches + preemptedCount += allocsGrp.deviceInstances[alloc.ID] + preemptedAllocs = append(preemptedAllocs, alloc) + + // Check if we met needed count + if preemptedCount+devInst.FreeCount() >= int(neededCount) { + return preemptedAllocs + } + } } } diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index d2e10f3bf..b526cac3c 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" + "strconv" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -163,7 +165,10 @@ func TestPreemption(t *testing.T) { // Create some persistent alloc ids to use in test cases allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} - deviceIDs := []string{"dev1", "dev2", "dev3", "dev4"} + var deviceIDs []string + for i := 0; i < 10; i++ { + deviceIDs = append(deviceIDs, "dev"+strconv.Itoa(i)) + } defaultNodeResources := &structs.NodeResources{ Cpu: structs.NodeCpuResources{ @@ -212,6 +217,35 @@ func TestPreemption(t *testing.T) { }, }, }, + { + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(3584, ""), + "graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), + }, + Instances: []*structs.NodeDevice{ + { + ID: deviceIDs[4], + Healthy: true, + }, + { + ID: deviceIDs[5], + Healthy: true, + }, + { + ID: deviceIDs[6], + Healthy: true, + }, + { + ID: deviceIDs[7], + Healthy: true, + }, + }, + }, { Type: "fpga", Vendor: "intel", @@ -954,7 +988,7 @@ func TestPreemption(t *testing.T) { DiskMB: 4 * 1024, Devices: []*structs.RequestedDevice{ { - Name: "gpu", + Name: "nvidia/gpu/1080ti", Count: 4, }, }, @@ -963,6 +997,81 @@ func TestPreemption(t *testing.T) { allocIDs[0]: {}, }, }, + { + // This test cases creates allocations across two GPUs + // Both GPUs are eligible for the task, but only allocs sharing the + // same device should be chosen for preemption + desc: "Preemption with allocs across multiple devices that match", + currentAllocations: []*structs.Allocation{ + createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 500, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[0], deviceIDs[1]}, + }), + createAllocWithDevice(allocIDs[1], highPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 100, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[2]}, + }), + createAllocWithDevice(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + DeviceIDs: []string{deviceIDs[4], deviceIDs[5]}, + }), + createAllocWithDevice(allocIDs[3], lowPrioJob, &structs.Resources{ + CPU: 100, + MemoryMB: 256, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + DeviceIDs: []string{deviceIDs[6], deviceIDs[7]}, + }), + createAllocWithDevice(allocIDs[4], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + DeviceIDs: []string{"fpga1"}, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + DiskMB: 4 * 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "gpu", + Count: 4, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[2]: {}, + allocIDs[3]: {}, + }, + }, { desc: "Device preemption not possible due to more instances needed than available", currentAllocations: []*structs.Allocation{ From e8088e404b56bb84223eeb2118bb0cafaed67909 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 16 Nov 2018 20:45:52 -0600 Subject: [PATCH 04/10] Fix formatting --- scheduler/preemption.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 6d6b929c6..a46d45612 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -503,7 +503,6 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // Examine matching allocs by device for deviceIDTuple, allocsGrp := range deviceToAllocs { - // First group and sort allocations using this device by priority allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, allocsGrp.allocs) @@ -515,7 +514,6 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev for _, grpAllocs := range allocsByPriority { for _, alloc := range grpAllocs.allocs { - // Look up the device instance from the device allocator devInst := devAlloc.Devices[deviceIDTuple] @@ -527,7 +525,6 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev if preemptedCount+devInst.FreeCount() >= int(neededCount) { return preemptedAllocs } - } } } From 026f976761611d4fbcac4280f745eaff164b7296 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 27 Nov 2018 09:59:57 -0600 Subject: [PATCH 05/10] code review suggestion Co-Authored-By: preetapan --- scheduler/preemption.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index a46d45612..64b549034 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -472,7 +472,7 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev devID := device.ID() // Look up the device instance from the device allocator - deviceIdTuple := *devID + deviceIdTuple := *device.ID() devInst := devAlloc.Devices[deviceIdTuple] if devInst == nil { From e7257fe5bed18b376898d42ae1c674d944467701 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 27 Nov 2018 10:03:11 -0600 Subject: [PATCH 06/10] Simplify map count update logic Co-Authored-By: preetapan --- scheduler/preemption.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 64b549034..444db618c 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -494,7 +494,7 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev allocDeviceGrp.allocs = append(allocDeviceGrp.allocs, alloc) devCount := allocDeviceGrp.deviceInstances[alloc.ID] devCount += len(device.DeviceIDs) - allocDeviceGrp.deviceInstances[alloc.ID] = devCount + allocDeviceGrp.deviceInstances[alloc.ID] += len(device.DeviceIDs) } } } From e023b367fa30678d62d753698ff9fdf64eb619a5 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 27 Nov 2018 11:02:06 -0600 Subject: [PATCH 07/10] addresses some code clarity review comments --- scheduler/preemption.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 444db618c..ec7973748 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -439,16 +439,16 @@ OUTER: return filteredBestAllocs } -// allocDeviceGroup represents a group of allocs that share a device -type allocDeviceGroup struct { +// deviceGroupAllocs represents a group of allocs that share a device +type deviceGroupAllocs struct { allocs []*structs.Allocation // deviceInstances tracks the number of instances used per alloc deviceInstances map[string]int } -func newAllocDeviceGroup() *allocDeviceGroup { - return &allocDeviceGroup{ +func newAllocDeviceGroup() *deviceGroupAllocs { + return &deviceGroupAllocs{ deviceInstances: make(map[string]int), } } @@ -459,7 +459,7 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // Group allocations by device, tracking the number of // instances used in each device by alloc id - deviceToAllocs := make(map[structs.DeviceIdTuple]*allocDeviceGroup) + deviceToAllocs := make(map[structs.DeviceIdTuple]*deviceGroupAllocs) for _, alloc := range p.currentAllocs { for _, tr := range alloc.AllocatedResources.Tasks { // Ignore allocs that don't use devices @@ -469,8 +469,6 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // Go through each assigned device group for _, device := range tr.Devices { - devID := device.ID() - // Look up the device instance from the device allocator deviceIdTuple := *device.ID() devInst := devAlloc.Devices[deviceIdTuple] From 4afd512f45372800eb0ca3ff5da10458a8d1989d Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 3 Dec 2018 08:31:41 -0600 Subject: [PATCH 08/10] use structured logging everywhere consistently --- scheduler/preemption.go | 5 ++--- scheduler/rank.go | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index ec7973748..18d47eaca 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -454,7 +454,7 @@ func newAllocDeviceGroup() *deviceGroupAllocs { } // PreemptForDevice tries to find allocations to preempt to meet devices needed -// This is called once per task when assigning devices to the task +// This is called once per device request when assigning devices to the task func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *deviceAllocator) []*structs.Allocation { // Group allocations by device, tracking the number of @@ -473,6 +473,7 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev deviceIdTuple := *device.ID() devInst := devAlloc.Devices[deviceIdTuple] + // devInst can be nil if the device is no longer healthy if devInst == nil { continue } @@ -490,8 +491,6 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev deviceToAllocs[deviceIdTuple] = allocDeviceGrp } allocDeviceGrp.allocs = append(allocDeviceGrp.allocs, alloc) - devCount := allocDeviceGrp.deviceInstances[alloc.ID] - devCount += len(device.DeviceIDs) allocDeviceGrp.deviceInstances[alloc.ID] += len(device.DeviceIDs) } } diff --git a/scheduler/rank.go b/scheduler/rank.go index d892d317f..0b531e689 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -251,7 +251,7 @@ OUTER: netPreemptions := preemptor.PreemptForNetwork(ask, netIdx) if netPreemptions == nil { - iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unable to meet network resource %v after preemption", ask)) + iter.ctx.Logger().Named("binpack").Error("preemption not possible ", "network_resource", ask) netIdx.Release() continue OUTER } @@ -268,7 +268,7 @@ OUTER: offer, err = netIdx.AssignNetwork(ask) if offer == nil { - iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unexpected error, unable to create offer after preempting:%v", err)) + iter.ctx.Logger().Named("binpack").Error("unexpected error, unable to create network offer after considering preemption", "error", err) netIdx.Release() continue OUTER } @@ -296,7 +296,7 @@ OUTER: devicePreemptions := preemptor.PreemptForDevice(req, devAllocator) if devicePreemptions == nil { - iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unable to meet device resource need after attempting preemption:%v", req)) + iter.ctx.Logger().Named("binpack").Error("preemption not possible", "requested_device", req) netIdx.Release() continue OUTER } @@ -312,7 +312,7 @@ OUTER: // Try offer again offer, sumAffinities, err = devAllocator.AssignDevice(req) if offer == nil { - iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unexpected error, unable to create device offer after preempting:%v", err)) + iter.ctx.Logger().Named("binpack").Error("unexpected error, unable to create device offer after considering preemption", "error", err) continue OUTER } } From 3921793030877804921108bdb4bdcb0a46b25bb0 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 7 Dec 2018 18:35:47 -0600 Subject: [PATCH 09/10] Score combinations of allocs from multiple devices for preemption --- scheduler/preemption.go | 37 ++++++++++++++++- scheduler/preemption_test.go | 77 +++++++++++++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 18d47eaca..90edcd94d 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -498,6 +498,7 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev neededCount := ask.Count + var preemptionOptions [][]*structs.Allocation // Examine matching allocs by device for deviceIDTuple, allocsGrp := range deviceToAllocs { // First group and sort allocations using this device by priority @@ -520,15 +521,49 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // Check if we met needed count if preemptedCount+devInst.FreeCount() >= int(neededCount) { - return preemptedAllocs + preemptionOptions = append(preemptionOptions, preemptedAllocs) } } } } + // Find the combination of allocs with lowest net priority + if len(preemptionOptions) > 0 { + return selectBestAllocs(preemptionOptions) + } + return nil } +// selectBestAllocs finds the best allocations based on minimal net priority amongst +// all options. The net priority is the sum of unique priorities in each option +func selectBestAllocs(allocSets [][]*structs.Allocation) []*structs.Allocation { + if len(allocSets) == 1 { + return allocSets[0] + } + bestPriority := math.MaxInt32 + var bestAllocs []*structs.Allocation + + for _, allocs := range allocSets { + // Find unique priorities and add them + priorities := map[int]struct{}{} + netPriority := 0 + for _, alloc := range allocs { + _, ok := priorities[alloc.Job.Priority] + if !ok { + priorities[alloc.Job.Priority] = struct{}{} + netPriority += alloc.Job.Priority + } + } + if netPriority < bestPriority { + bestPriority = netPriority + bestAllocs = allocs + } + + } + return bestAllocs +} + // basicResourceDistance computes a distance using a coordinate system. It compares resource fields like CPU/Memory and Disk. // Values emitted are in the range [0, maxFloat] func basicResourceDistance(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources) float64 { diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index b526cac3c..edbb71fb0 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -160,7 +160,7 @@ func TestPreemption(t *testing.T) { lowPrioJob.Priority = 30 lowPrioJob2 := mock.Job() - lowPrioJob2.Priority = 30 + lowPrioJob2.Priority = 40 // Create some persistent alloc ids to use in test cases allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} @@ -1072,6 +1072,81 @@ func TestPreemption(t *testing.T) { allocIDs[3]: {}, }, }, + { + // This test cases creates allocations across two GPUs + // Both GPUs are eligible for the task, but only allocs with the lower + // priority are chosen + desc: "Preemption with allocs across multiple devices that match", + currentAllocations: []*structs.Allocation{ + createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 500, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[0], deviceIDs[1]}, + }), + createAllocWithDevice(allocIDs[1], lowPrioJob2, &structs.Resources{ + CPU: 200, + MemoryMB: 100, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{deviceIDs[2], deviceIDs[3]}, + }), + createAllocWithDevice(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + DeviceIDs: []string{deviceIDs[4], deviceIDs[5]}, + }), + createAllocWithDevice(allocIDs[3], lowPrioJob, &structs.Resources{ + CPU: 100, + MemoryMB: 256, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + DeviceIDs: []string{deviceIDs[6], deviceIDs[7]}, + }), + createAllocWithDevice(allocIDs[4], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + DeviceIDs: []string{"fpga1"}, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + DiskMB: 4 * 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "gpu", + Count: 4, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[2]: {}, + allocIDs[3]: {}, + }, + }, { desc: "Device preemption not possible due to more instances needed than available", currentAllocations: []*structs.Allocation{ From e7162e8bd8aee7521a1a8315c6720a2be59f0016 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 11 Dec 2018 10:12:18 -0600 Subject: [PATCH 10/10] Early continue after meeting needed count Also adds another optimization that filters out un-needed allocations as a final filtering step --- scheduler/preemption.go | 49 +++++++++++++++++++++++++++--------- scheduler/preemption_test.go | 18 +++++++++++-- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 90edcd94d..d65c2f71c 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -498,8 +498,9 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev neededCount := ask.Count - var preemptionOptions [][]*structs.Allocation + var preemptionOptions []*deviceGroupAllocs // Examine matching allocs by device +OUTER: for deviceIDTuple, allocsGrp := range deviceToAllocs { // First group and sort allocations using this device by priority allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, allocsGrp.allocs) @@ -521,7 +522,11 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // Check if we met needed count if preemptedCount+devInst.FreeCount() >= int(neededCount) { - preemptionOptions = append(preemptionOptions, preemptedAllocs) + preemptionOptions = append(preemptionOptions, &deviceGroupAllocs{ + allocs: preemptedAllocs, + deviceInstances: allocsGrp.deviceInstances, + }) + continue OUTER } } } @@ -529,7 +534,7 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // Find the combination of allocs with lowest net priority if len(preemptionOptions) > 0 { - return selectBestAllocs(preemptionOptions) + return selectBestAllocs(preemptionOptions, int(neededCount)) } return nil @@ -537,18 +542,39 @@ func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *dev // selectBestAllocs finds the best allocations based on minimal net priority amongst // all options. The net priority is the sum of unique priorities in each option -func selectBestAllocs(allocSets [][]*structs.Allocation) []*structs.Allocation { - if len(allocSets) == 1 { - return allocSets[0] - } +func selectBestAllocs(preemptionOptions []*deviceGroupAllocs, neededCount int) []*structs.Allocation { bestPriority := math.MaxInt32 var bestAllocs []*structs.Allocation - for _, allocs := range allocSets { - // Find unique priorities and add them + // We iterate over allocations in priority order, so its possible + // that we have more allocations than needed to meet the needed count. + // e.g we need 4 instances, and we get 3 from a priority 10 alloc, and 4 from + // a priority 20 alloc. We should filter out the priority 10 alloc in that case. + // This loop does a filter and chooses the set with the smallest net priority + for _, allocGrp := range preemptionOptions { + // Find unique priorities and add them to calculate net priority priorities := map[int]struct{}{} netPriority := 0 - for _, alloc := range allocs { + + devInst := allocGrp.deviceInstances + var filteredAllocs []*structs.Allocation + + // Sort by number of device instances used, descending + sort.Slice(allocGrp.allocs, func(i, j int) bool { + instanceCount1 := devInst[allocGrp.allocs[i].ID] + instanceCount2 := devInst[allocGrp.allocs[j].ID] + return instanceCount1 > instanceCount2 + }) + + // Filter and calculate net priority + preemptedInstanceCount := 0 + for _, alloc := range allocGrp.allocs { + if preemptedInstanceCount >= neededCount { + break + } + instanceCount := devInst[alloc.ID] + preemptedInstanceCount += instanceCount + filteredAllocs = append(filteredAllocs, alloc) _, ok := priorities[alloc.Job.Priority] if !ok { priorities[alloc.Job.Priority] = struct{}{} @@ -557,9 +583,8 @@ func selectBestAllocs(allocSets [][]*structs.Allocation) []*structs.Allocation { } if netPriority < bestPriority { bestPriority = netPriority - bestAllocs = allocs + bestAllocs = filteredAllocs } - } return bestAllocs } diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index edbb71fb0..e54d5c77d 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -163,7 +163,7 @@ func TestPreemption(t *testing.T) { lowPrioJob2.Priority = 40 // Create some persistent alloc ids to use in test cases - allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} + allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} var deviceIDs []string for i := 0; i < 10; i++ { @@ -244,6 +244,10 @@ func TestPreemption(t *testing.T) { ID: deviceIDs[7], Healthy: true, }, + { + ID: deviceIDs[8], + Healthy: true, + }, }, }, { @@ -1076,7 +1080,7 @@ func TestPreemption(t *testing.T) { // This test cases creates allocations across two GPUs // Both GPUs are eligible for the task, but only allocs with the lower // priority are chosen - desc: "Preemption with allocs across multiple devices that match", + desc: "Preemption with lower/higher priority combinations", currentAllocations: []*structs.Allocation{ createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{ CPU: 500, @@ -1119,6 +1123,16 @@ func TestPreemption(t *testing.T) { DeviceIDs: []string{deviceIDs[6], deviceIDs[7]}, }), createAllocWithDevice(allocIDs[4], lowPrioJob, &structs.Resources{ + CPU: 100, + MemoryMB: 256, + DiskMB: 4 * 1024, + }, &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + DeviceIDs: []string{deviceIDs[8]}, + }), + createAllocWithDevice(allocIDs[5], lowPrioJob, &structs.Resources{ CPU: 200, MemoryMB: 512, DiskMB: 4 * 1024,