From 8dcbeeed8fcb127bcb2ecef974c189a343979c08 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 1 Sep 2016 17:41:50 -0700 Subject: [PATCH] Adding LocalDisk to alloc.Job --- nomad/state/state_store.go | 91 ++++++++++++++++++++------------- nomad/state/state_store_test.go | 59 +++++++++++++++++++++ 2 files changed, 115 insertions(+), 35 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index eca57db93..557538412 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -357,23 +357,9 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } - // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB - // from task resources - for _, tg := range job.TaskGroups { - if tg.LocalDisk != nil { - continue - } - var diskMB int - for _, task := range tg.Tasks { - if task.Resources != nil { - diskMB += task.Resources.DiskMB - task.Resources.DiskMB = 0 - } - } - tg.LocalDisk = &structs.LocalDisk{ - DiskMB: diskMB, - } - } + // Create the LocalDisk if it's nil by adding up DiskMB from task resources. + // COMPAT 0.4.1 -> 0.5 + s.addLocalDiskToTaskGroups(job) // Insert the job if err := txn.Insert("jobs", job); err != nil { @@ -975,6 +961,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("error updating job summary: %v", err) } + // Create the LocalDisk if it's nil by adding up DiskMB from task resources. + // COMPAT 0.4.1 -> 0.5 + if alloc.Job != nil { + s.addLocalDiskToTaskGroups(alloc.Job) + } + if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -1664,6 +1656,25 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return nil } +// addLocalDiskToTaskGroups adds missing LocalDisk objects to TaskGroups +func (s *StateStore) addLocalDiskToTaskGroups(job *structs.Job) { + for _, tg := range job.TaskGroups { + if tg.LocalDisk != nil { + continue + } + var diskMB int + for _, task := range tg.Tasks { + if task.Resources != nil { + diskMB += task.Resources.DiskMB + task.Resources.DiskMB = 0 + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + } +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -1704,23 +1715,9 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { r.items.Add(watch.Item{Table: "jobs"}) r.items.Add(watch.Item{Job: job.ID}) - // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB - // from task resources - for _, tg := range job.TaskGroups { - if tg.LocalDisk != nil { - continue - } - var diskMB int - for _, task := range tg.Tasks { - if task.Resources != nil { - diskMB += task.Resources.DiskMB - task.Resources.DiskMB = 0 - } - } - tg.LocalDisk = &structs.LocalDisk{ - DiskMB: diskMB, - } - } + // Create the LocalDisk if it's nil by adding up DiskMB from task resources. + // COMPAT 0.4.1 -> 0.5 + r.addLocalDiskToTaskGroups(job) if err := r.txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -1746,14 +1743,19 @@ func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { r.items.Add(watch.Item{AllocJob: alloc.JobID}) r.items.Add(watch.Item{AllocNode: alloc.NodeID}) - //COMPAT 0.4.1 -> 0.5 // Set the shared resources if it's not present + // COMPAT 0.4.1 -> 0.5 if alloc.SharedResources == nil { alloc.SharedResources = &structs.Resources{ DiskMB: alloc.Resources.DiskMB, } } + // Create the LocalDisk if it's nil by adding up DiskMB from task resources. + if alloc.Job != nil { + r.addLocalDiskToTaskGroups(alloc.Job) + } + if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -1794,6 +1796,25 @@ func (r *StateRestore) VaultAccessorRestore(accessor *structs.VaultAccessor) err return nil } +// addLocalDiskToTaskGroups adds missing LocalDisk objects to TaskGroups +func (r *StateRestore) addLocalDiskToTaskGroups(job *structs.Job) { + for _, tg := range job.TaskGroups { + if tg.LocalDisk != nil { + continue + } + var diskMB int + for _, task := range tg.Tasks { + if task.Resources != nil { + diskMB += task.Resources.DiskMB + task.Resources.DiskMB = 0 + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + } +} + // stateWatch holds shared state for watching updates. This is // outside of StateStore so it can be shared with snapshots. type stateWatch struct { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f4808c8c5..8b7387d7c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1792,6 +1792,33 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { notify.verify(t) } +func TestStateStore_UpsertAlloc_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].LocalDisk = nil + alloc.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 120 + + if err := state.UpsertJob(999, alloc.Job); err != nil { + t.Fatalf("err: %v", err) + } + + err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + expected := alloc.Copy() + expected.Job.TaskGroups[0].LocalDisk = &structs.LocalDisk{DiskMB: 120} + if !reflect.DeepEqual(expected, out) { + t.Fatalf("bad: %#v %#v", expected, out) + } +} + func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() @@ -2489,6 +2516,38 @@ func TestStateStore_RestoreAlloc(t *testing.T) { notify.verify(t) } +func TestStateStore_RestoreAlloc_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].LocalDisk = nil + alloc.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 120 + + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + + err = restore.AllocRestore(alloc) + if err != nil { + t.Fatalf("err: %v", err) + } + + restore.Commit() + + out, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + expected := alloc.Copy() + expected.Job.TaskGroups[0].LocalDisk = &structs.LocalDisk{DiskMB: 120} + expected.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + + if !reflect.DeepEqual(out, expected) { + t.Fatalf("Bad: %#v %#v", out, expected) + } +} + func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { state := testStateStore(t) watcher := watch.NewItems()