Adding LocalDisk to alloc.Job

This commit is contained in:
Diptanu Choudhury
2016-09-01 17:41:50 -07:00
parent e2c179821e
commit 8dcbeeed8f
2 changed files with 115 additions and 35 deletions

View File

@@ -357,23 +357,9 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("unable to create job summary: %v", err)
}
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
s.addLocalDiskToTaskGroups(job)
// Insert the job
if err := txn.Insert("jobs", job); err != nil {
@@ -975,6 +961,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("error updating job summary: %v", err)
}
// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
if alloc.Job != nil {
s.addLocalDiskToTaskGroups(alloc.Job)
}
if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
@@ -1664,6 +1656,25 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}
// addLocalDiskToTaskGroups adds missing LocalDisk objects to TaskGroups
func (s *StateStore) addLocalDiskToTaskGroups(job *structs.Job) {
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
@@ -1704,23 +1715,9 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
r.items.Add(watch.Item{Table: "jobs"})
r.items.Add(watch.Item{Job: job.ID})
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
r.addLocalDiskToTaskGroups(job)
if err := r.txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
@@ -1746,14 +1743,19 @@ func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
r.items.Add(watch.Item{AllocJob: alloc.JobID})
r.items.Add(watch.Item{AllocNode: alloc.NodeID})
//COMPAT 0.4.1 -> 0.5
// Set the shared resources if it's not present
// COMPAT 0.4.1 -> 0.5
if alloc.SharedResources == nil {
alloc.SharedResources = &structs.Resources{
DiskMB: alloc.Resources.DiskMB,
}
}
// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
if alloc.Job != nil {
r.addLocalDiskToTaskGroups(alloc.Job)
}
if err := r.txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
@@ -1794,6 +1796,25 @@ func (r *StateRestore) VaultAccessorRestore(accessor *structs.VaultAccessor) err
return nil
}
// addLocalDiskToTaskGroups adds missing LocalDisk objects to TaskGroups
func (r *StateRestore) addLocalDiskToTaskGroups(job *structs.Job) {
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
}
// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {

View File

@@ -1792,6 +1792,33 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
notify.verify(t)
}
func TestStateStore_UpsertAlloc_NoLocalDisk(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].LocalDisk = nil
alloc.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 120
if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
expected := alloc.Copy()
expected.Job.TaskGroups[0].LocalDisk = &structs.LocalDisk{DiskMB: 120}
if !reflect.DeepEqual(expected, out) {
t.Fatalf("bad: %#v %#v", expected, out)
}
}
func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
@@ -2489,6 +2516,38 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
notify.verify(t)
}
func TestStateStore_RestoreAlloc_NoLocalDisk(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].LocalDisk = nil
alloc.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 120
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
err = restore.AllocRestore(alloc)
if err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
expected := alloc.Copy()
expected.Job.TaskGroups[0].LocalDisk = &structs.LocalDisk{DiskMB: 120}
expected.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 0
if !reflect.DeepEqual(out, expected) {
t.Fatalf("Bad: %#v %#v", out, expected)
}
}
func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
state := testStateStore(t)
watcher := watch.NewItems()