Upsert Job Histories

This commit is contained in:
Alex Dadgar
2017-04-13 13:54:57 -07:00
parent 4b244219f9
commit 0c94d4822b
7 changed files with 253 additions and 17 deletions

View File

@@ -297,6 +297,8 @@ type Job struct {
VaultToken *string `mapstructure:"vault_token"`
Status *string
StatusDescription *string
Stable *bool
Version *uint64
CreateIndex *uint64
ModifyIndex *uint64
JobModifyIndex *uint64
@@ -343,6 +345,12 @@ func (j *Job) Canonicalize() {
if j.StatusDescription == nil {
j.StatusDescription = helper.StringToPtr("")
}
if j.Stable == nil {
j.Stable = helper.BoolToPtr(false)
}
if j.Version == nil {
j.Version = helper.Uint64ToPtr(0)
}
if j.CreateIndex == nil {
j.CreateIndex = helper.Uint64ToPtr(0)
}

View File

@@ -372,22 +372,17 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
job.Canonicalize()
j := &structs.Job{
Region: *job.Region,
ID: *job.ID,
ParentID: *job.ParentID,
Name: *job.Name,
Type: *job.Type,
Priority: *job.Priority,
AllAtOnce: *job.AllAtOnce,
Datacenters: job.Datacenters,
Payload: job.Payload,
Meta: job.Meta,
VaultToken: *job.VaultToken,
Status: *job.Status,
StatusDescription: *job.StatusDescription,
CreateIndex: *job.CreateIndex,
ModifyIndex: *job.ModifyIndex,
JobModifyIndex: *job.JobModifyIndex,
Region: *job.Region,
ID: *job.ID,
ParentID: *job.ParentID,
Name: *job.Name,
Type: *job.Type,
Priority: *job.Priority,
AllAtOnce: *job.AllAtOnce,
Datacenters: job.Datacenters,
Payload: job.Payload,
Meta: job.Meta,
VaultToken: *job.VaultToken,
}
j.Constraints = make([]*structs.Constraint, len(job.Constraints))

View File

@@ -146,6 +146,7 @@ func Job() *structs.Job {
"owner": "armon",
},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,

View File

@@ -158,7 +158,7 @@ func jobHistorySchema() *memdb.TableSchema {
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "JobID",
Field: "ID",
Lowercase: true,
},

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"sort"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
@@ -311,6 +312,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index
job.JobModifyIndex = index
job.Version = existing.(*structs.Job).Version + 1
// Compute the job status
var err error
@@ -322,6 +324,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
job.CreateIndex = index
job.ModifyIndex = index
job.JobModifyIndex = index
job.Version = 0
if err := s.setJobStatus(index, txn, job, false, ""); err != nil {
return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
@@ -341,6 +344,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("unable to create job summary: %v", err)
}
if err := s.upsertJobHistory(index, job, txn); err != nil {
return fmt.Errorf("unable to upsert job into job_histories table: %v", err)
}
// Create the EphemeralDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
s.addEphemeralDiskToTaskGroups(job)
@@ -437,6 +444,55 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return nil
}
// upsertJobHistory inserts a job into its historic table and limits the number
// of historic jobs that are tracked.
func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb.Txn) error {
// Insert the job
if err := txn.Insert("job_histories", job); err != nil {
return fmt.Errorf("failed to insert job into job_histories table: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_histories", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
// Get all the historic jobs for this ID
all, err := s.jobHistoryByID(txn, nil, job.ID)
if err != nil {
return fmt.Errorf("failed to look up job history for %q: %v", job.ID, err)
}
// If we are below the limit there is no GCing to be done
if len(all) <= structs.JobDefaultHistoricCount {
return nil
}
// We have to delete a historic job to make room.
// Find index of the highest versioned stable job
stableIdx := -1
for i, j := range all {
if j.Stable {
stableIdx = i
break
}
}
// If the stable job is the oldest version, do a swap to bring it into the
// keep set.
max := structs.JobDefaultHistoricCount
if stableIdx == max {
all[max-1], all[max] = all[max], all[max-1]
}
// Delete the job outside of the set that are being kept.
d := all[max]
if err := txn.Delete("job_histories", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_histories", d.ID, d.Version)
}
return nil
}
// JobByID is used to lookup a job by its ID
func (s *StateStore) JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) {
txn := s.db.Txn(false)
@@ -467,6 +523,50 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultI
return iter, nil
}
// JobHistoryByID returns all the tracked versions of a job.
func (s *StateStore) JobHistoryByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) {
txn := s.db.Txn(false)
return s.jobHistoryByID(txn, &ws, id)
}
// jobHistoryByID is the underlying implementation for retrieving all tracked
// versions of a job and is called under an existing transaction. A watch set
// can optionally be passed in to add the job histories to the watch set.
func (s *StateStore) jobHistoryByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) {
// Get all the historic jobs for this ID
iter, err := txn.Get("job_histories", "id_prefix", id)
if err != nil {
return nil, err
}
if ws != nil {
ws.Add(iter.WatchCh())
}
var all []*structs.Job
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the ID is an exact match
j := raw.(*structs.Job)
if j.ID != id {
continue
}
all = append(all, j)
}
// Sort with highest versions first
sort.Slice(all, func(i, j int) bool {
return all[i].Version >= all[j].Version
})
return all, nil
}
// Jobs returns an iterator over all the jobs
func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)

View File

@@ -420,6 +420,19 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
if watchFired(ws) {
t.Fatalf("bad")
}
// Check the job history
allVersions, err := state.JobHistoryByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(allVersions) != 1 {
t.Fatalf("got %d; want 1", len(allVersions))
}
if a := allVersions[0]; a.ID != job.ID || a.Version != 0 {
t.Fatalf("bad: %v", a)
}
}
func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
@@ -439,6 +452,7 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
job2 := mock.Job()
job2.ID = job.ID
job2.AllAtOnce = true
err = state.UpsertJob(1001, job2)
if err != nil {
t.Fatalf("err: %v", err)
@@ -490,6 +504,22 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
t.Fatalf("nil summary for task group")
}
// Check the job history
allVersions, err := state.JobHistoryByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(allVersions) != 2 {
t.Fatalf("got %d; want 1", len(allVersions))
}
if a := allVersions[0]; a.ID != job.ID || a.Version != 1 || !a.AllAtOnce {
t.Fatalf("bad: %+v", a)
}
if a := allVersions[1]; a.ID != job.ID || a.Version != 0 || a.AllAtOnce {
t.Fatalf("bad: %+v", a)
}
if watchFired(ws) {
t.Fatalf("bad")
}
@@ -628,6 +658,95 @@ func TestStateStore_UpsertJob_ChildJob(t *testing.T) {
}
}
func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) {
state := testStateStore(t)
// Create a job and mark it as stable
job := mock.Job()
job.Stable = true
job.Priority = 0
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
_, err := state.JobHistoryByID(ws, job.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
var finalJob *structs.Job
for i := 1; i < 20; i++ {
finalJob = mock.Job()
finalJob.ID = job.ID
finalJob.Priority = i
err = state.UpsertJob(uint64(1000+i), finalJob)
if err != nil {
t.Fatalf("err: %v", err)
}
}
ws = memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(finalJob, out) {
t.Fatalf("bad: %#v %#v", finalJob, out)
}
if out.CreateIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 1019 {
t.Fatalf("bad: %#v", out)
}
if out.Version != 19 {
t.Fatalf("bad: %#v", out)
}
index, err := state.Index("job_histories")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1019 {
t.Fatalf("bad: %d", index)
}
// Check the job history
allVersions, err := state.JobHistoryByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(allVersions) != structs.JobDefaultHistoricCount {
t.Fatalf("got %d; want 1", len(allVersions))
}
if a := allVersions[0]; a.ID != job.ID || a.Version != 19 || a.Priority != 19 {
t.Fatalf("bad: %+v", a)
}
if a := allVersions[1]; a.ID != job.ID || a.Version != 18 || a.Priority != 18 {
t.Fatalf("bad: %+v", a)
}
// Ensure we didn't delete the stable job
if a := allVersions[structs.JobDefaultHistoricCount-1]; a.ID != job.ID ||
a.Version != 0 || a.Priority != 0 || !a.Stable {
t.Fatalf("bad: %+v", a)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_DeleteJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()

View File

@@ -1097,6 +1097,10 @@ const (
// specified job so that it gets priority. This is important
// for the system to remain healthy.
CoreJobPriority = JobMaxPriority * 2
// JobDefaultHistoricCount is the number of historic job versions that are
// kept.
JobDefaultHistoricCount = 6
)
// Job is the scope of a scheduling request to Nomad. It is the largest
@@ -1172,6 +1176,15 @@ type Job struct {
// StatusDescription is meant to provide more human useful information
StatusDescription string
// Stable marks a job as stable. Stability is only defined on "service" and
// "system" jobs. The stability of a job will be set automatically as part
// of a deployment and can be manually set via APIs.
Stable bool
// Version is a monitonically increasing version number that is incremened
// on each job register.
Version uint64
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64