From 0c94d4822bce70dfdb03e6870764af41717cdbac Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 13 Apr 2017 13:54:57 -0700 Subject: [PATCH] Upsert Job Histories --- api/jobs.go | 8 +++ command/agent/job_endpoint.go | 27 +++----- nomad/mock/mock.go | 1 + nomad/state/schema.go | 2 +- nomad/state/state_store.go | 100 +++++++++++++++++++++++++++ nomad/state/state_store_test.go | 119 ++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 13 ++++ 7 files changed, 253 insertions(+), 17 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 90cb11614..91829aaca 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -297,6 +297,8 @@ type Job struct { VaultToken *string `mapstructure:"vault_token"` Status *string StatusDescription *string + Stable *bool + Version *uint64 CreateIndex *uint64 ModifyIndex *uint64 JobModifyIndex *uint64 @@ -343,6 +345,12 @@ func (j *Job) Canonicalize() { if j.StatusDescription == nil { j.StatusDescription = helper.StringToPtr("") } + if j.Stable == nil { + j.Stable = helper.BoolToPtr(false) + } + if j.Version == nil { + j.Version = helper.Uint64ToPtr(0) + } if j.CreateIndex == nil { j.CreateIndex = helper.Uint64ToPtr(0) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 5ad4bde83..9e4086acf 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -372,22 +372,17 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { job.Canonicalize() j := &structs.Job{ - Region: *job.Region, - ID: *job.ID, - ParentID: *job.ParentID, - Name: *job.Name, - Type: *job.Type, - Priority: *job.Priority, - AllAtOnce: *job.AllAtOnce, - Datacenters: job.Datacenters, - Payload: job.Payload, - Meta: job.Meta, - VaultToken: *job.VaultToken, - Status: *job.Status, - StatusDescription: *job.StatusDescription, - CreateIndex: *job.CreateIndex, - ModifyIndex: *job.ModifyIndex, - JobModifyIndex: *job.JobModifyIndex, + Region: *job.Region, + ID: *job.ID, + ParentID: *job.ParentID, + Name: *job.Name, + Type: *job.Type, + Priority: *job.Priority, + AllAtOnce: *job.AllAtOnce, + Datacenters: job.Datacenters, + Payload: job.Payload, + Meta: job.Meta, + VaultToken: *job.VaultToken, } j.Constraints = make([]*structs.Constraint, len(job.Constraints)) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 8214c8ca9..8191b00f6 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -146,6 +146,7 @@ func Job() *structs.Job { "owner": "armon", }, Status: structs.JobStatusPending, + Version: 0, CreateIndex: 42, ModifyIndex: 99, JobModifyIndex: 99, diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 535024c7c..b0ef44b27 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -158,7 +158,7 @@ func jobHistorySchema() *memdb.TableSchema { Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{ - Field: "JobID", + Field: "ID", Lowercase: true, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a5235d345..a0dc8c38d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "sort" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" @@ -311,6 +312,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index job.JobModifyIndex = index + job.Version = existing.(*structs.Job).Version + 1 // Compute the job status var err error @@ -322,6 +324,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { job.CreateIndex = index job.ModifyIndex = index job.JobModifyIndex = index + job.Version = 0 if err := s.setJobStatus(index, txn, job, false, ""); err != nil { return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) @@ -341,6 +344,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } + if err := s.upsertJobHistory(index, job, txn); err != nil { + return fmt.Errorf("unable to upsert job into job_histories table: %v", err) + } + // Create the EphemeralDisk if it's nil by adding up DiskMB from task resources. // COMPAT 0.4.1 -> 0.5 s.addEphemeralDiskToTaskGroups(job) @@ -437,6 +444,55 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return nil } +// upsertJobHistory inserts a job into its historic table and limits the number +// of historic jobs that are tracked. +func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb.Txn) error { + // Insert the job + if err := txn.Insert("job_histories", job); err != nil { + return fmt.Errorf("failed to insert job into job_histories table: %v", err) + } + + if err := txn.Insert("index", &IndexEntry{"job_histories", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + // Get all the historic jobs for this ID + all, err := s.jobHistoryByID(txn, nil, job.ID) + if err != nil { + return fmt.Errorf("failed to look up job history for %q: %v", job.ID, err) + } + + // If we are below the limit there is no GCing to be done + if len(all) <= structs.JobDefaultHistoricCount { + return nil + } + + // We have to delete a historic job to make room. + // Find index of the highest versioned stable job + stableIdx := -1 + for i, j := range all { + if j.Stable { + stableIdx = i + break + } + } + + // If the stable job is the oldest version, do a swap to bring it into the + // keep set. + max := structs.JobDefaultHistoricCount + if stableIdx == max { + all[max-1], all[max] = all[max], all[max-1] + } + + // Delete the job outside of the set that are being kept. + d := all[max] + if err := txn.Delete("job_histories", d); err != nil { + return fmt.Errorf("failed to delete job %v (%d) from job_histories", d.ID, d.Version) + } + + return nil +} + // JobByID is used to lookup a job by its ID func (s *StateStore) JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) { txn := s.db.Txn(false) @@ -467,6 +523,50 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultI return iter, nil } +// JobHistoryByID returns all the tracked versions of a job. +func (s *StateStore) JobHistoryByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) { + txn := s.db.Txn(false) + return s.jobHistoryByID(txn, &ws, id) +} + +// jobHistoryByID is the underlying implementation for retrieving all tracked +// versions of a job and is called under an existing transaction. A watch set +// can optionally be passed in to add the job histories to the watch set. +func (s *StateStore) jobHistoryByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) { + // Get all the historic jobs for this ID + iter, err := txn.Get("job_histories", "id_prefix", id) + if err != nil { + return nil, err + } + + if ws != nil { + ws.Add(iter.WatchCh()) + } + + var all []*structs.Job + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the ID is an exact match + j := raw.(*structs.Job) + if j.ID != id { + continue + } + + all = append(all, j) + } + + // Sort with highest versions first + sort.Slice(all, func(i, j int) bool { + return all[i].Version >= all[j].Version + }) + + return all, nil +} + // Jobs returns an iterator over all the jobs func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f3d96c18b..687263346 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -420,6 +420,19 @@ func TestStateStore_UpsertJob_Job(t *testing.T) { if watchFired(ws) { t.Fatalf("bad") } + + // Check the job history + allVersions, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(allVersions) != 1 { + t.Fatalf("got %d; want 1", len(allVersions)) + } + + if a := allVersions[0]; a.ID != job.ID || a.Version != 0 { + t.Fatalf("bad: %v", a) + } } func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { @@ -439,6 +452,7 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { job2 := mock.Job() job2.ID = job.ID + job2.AllAtOnce = true err = state.UpsertJob(1001, job2) if err != nil { t.Fatalf("err: %v", err) @@ -490,6 +504,22 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { t.Fatalf("nil summary for task group") } + // Check the job history + allVersions, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(allVersions) != 2 { + t.Fatalf("got %d; want 1", len(allVersions)) + } + + if a := allVersions[0]; a.ID != job.ID || a.Version != 1 || !a.AllAtOnce { + t.Fatalf("bad: %+v", a) + } + if a := allVersions[1]; a.ID != job.ID || a.Version != 0 || a.AllAtOnce { + t.Fatalf("bad: %+v", a) + } + if watchFired(ws) { t.Fatalf("bad") } @@ -628,6 +658,95 @@ func TestStateStore_UpsertJob_ChildJob(t *testing.T) { } } +func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { + state := testStateStore(t) + + // Create a job and mark it as stable + job := mock.Job() + job.Stable = true + job.Priority = 0 + + // Create a watchset so we can test that upsert fires the watch + ws := memdb.NewWatchSet() + _, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("bad: %v", err) + } + + if err := state.UpsertJob(1000, job); err != nil { + t.Fatalf("err: %v", err) + } + + if !watchFired(ws) { + t.Fatalf("bad") + } + + var finalJob *structs.Job + for i := 1; i < 20; i++ { + finalJob = mock.Job() + finalJob.ID = job.ID + finalJob.Priority = i + err = state.UpsertJob(uint64(1000+i), finalJob) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + ws = memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(finalJob, out) { + t.Fatalf("bad: %#v %#v", finalJob, out) + } + + if out.CreateIndex != 1000 { + t.Fatalf("bad: %#v", out) + } + if out.ModifyIndex != 1019 { + t.Fatalf("bad: %#v", out) + } + if out.Version != 19 { + t.Fatalf("bad: %#v", out) + } + + index, err := state.Index("job_histories") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1019 { + t.Fatalf("bad: %d", index) + } + + // Check the job history + allVersions, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(allVersions) != structs.JobDefaultHistoricCount { + t.Fatalf("got %d; want 1", len(allVersions)) + } + + if a := allVersions[0]; a.ID != job.ID || a.Version != 19 || a.Priority != 19 { + t.Fatalf("bad: %+v", a) + } + if a := allVersions[1]; a.ID != job.ID || a.Version != 18 || a.Priority != 18 { + t.Fatalf("bad: %+v", a) + } + + // Ensure we didn't delete the stable job + if a := allVersions[structs.JobDefaultHistoricCount-1]; a.ID != job.ID || + a.Version != 0 || a.Priority != 0 || !a.Stable { + t.Fatalf("bad: %+v", a) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + func TestStateStore_DeleteJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 57376279c..43387329a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1097,6 +1097,10 @@ const ( // specified job so that it gets priority. This is important // for the system to remain healthy. CoreJobPriority = JobMaxPriority * 2 + + // JobDefaultHistoricCount is the number of historic job versions that are + // kept. + JobDefaultHistoricCount = 6 ) // Job is the scope of a scheduling request to Nomad. It is the largest @@ -1172,6 +1176,15 @@ type Job struct { // StatusDescription is meant to provide more human useful information StatusDescription string + // Stable marks a job as stable. Stability is only defined on "service" and + // "system" jobs. The stability of a job will be set automatically as part + // of a deployment and can be manually set via APIs. + Stable bool + + // Version is a monitonically increasing version number that is incremened + // on each job register. + Version uint64 + // Raft Indexes CreateIndex uint64 ModifyIndex uint64