Histories -> Versions

This commit is contained in:
Alex Dadgar
2017-04-13 14:54:22 -07:00
parent 0c94d4822b
commit 5c618ab56e
4 changed files with 38 additions and 38 deletions

View File

@@ -20,7 +20,7 @@ func stateStoreSchema() *memdb.DBSchema {
nodeTableSchema,
jobTableSchema,
jobSummarySchema,
jobHistorySchema,
jobVersionsSchema,
periodicLaunchTableSchema,
evalTableSchema,
allocTableSchema,
@@ -142,11 +142,11 @@ func jobSummarySchema() *memdb.TableSchema {
}
}
// jobHistorySchema returns the memdb schema for the job history table which
// keeps a historical view of jobs.
func jobHistorySchema() *memdb.TableSchema {
// jobVersionsSchema returns the memdb schema for the job version table which
// keeps a historical view of job versions.
func jobVersionsSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "job_histories",
Name: "job_versions",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",

View File

@@ -344,8 +344,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("unable to create job summary: %v", err)
}
if err := s.upsertJobHistory(index, job, txn); err != nil {
return fmt.Errorf("unable to upsert job into job_histories table: %v", err)
if err := s.upsertJobVersion(index, job, txn); err != nil {
return fmt.Errorf("unable to upsert job into job_versions table: %v", err)
}
// Create the EphemeralDisk if it's nil by adding up DiskMB from task resources.
@@ -444,26 +444,26 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return nil
}
// upsertJobHistory inserts a job into its historic table and limits the number
// of historic jobs that are tracked.
func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb.Txn) error {
// upsertJobVersion inserts a job into its historic version table and limits the
// number of job versions that are tracked.
func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error {
// Insert the job
if err := txn.Insert("job_histories", job); err != nil {
return fmt.Errorf("failed to insert job into job_histories table: %v", err)
if err := txn.Insert("job_versions", job); err != nil {
return fmt.Errorf("failed to insert job into job_versions table: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_histories", index}); err != nil {
if err := txn.Insert("index", &IndexEntry{"job_versions", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
// Get all the historic jobs for this ID
all, err := s.jobHistoryByID(txn, nil, job.ID)
all, err := s.jobVersionByID(txn, nil, job.ID)
if err != nil {
return fmt.Errorf("failed to look up job history for %q: %v", job.ID, err)
return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err)
}
// If we are below the limit there is no GCing to be done
if len(all) <= structs.JobDefaultHistoricCount {
if len(all) <= structs.JobTrackedVersions {
return nil
}
@@ -479,15 +479,15 @@ func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb
// If the stable job is the oldest version, do a swap to bring it into the
// keep set.
max := structs.JobDefaultHistoricCount
max := structs.JobTrackedVersions
if stableIdx == max {
all[max-1], all[max] = all[max], all[max-1]
}
// Delete the job outside of the set that are being kept.
d := all[max]
if err := txn.Delete("job_histories", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_histories", d.ID, d.Version)
if err := txn.Delete("job_versions", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_versions", d.ID, d.Version)
}
return nil
@@ -523,18 +523,18 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultI
return iter, nil
}
// JobHistoryByID returns all the tracked versions of a job.
func (s *StateStore) JobHistoryByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) {
// JobVersionsByID returns all the tracked versions of a job.
func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) {
txn := s.db.Txn(false)
return s.jobHistoryByID(txn, &ws, id)
return s.jobVersionByID(txn, &ws, id)
}
// jobHistoryByID is the underlying implementation for retrieving all tracked
// jobVersionByID is the underlying implementation for retrieving all tracked
// versions of a job and is called under an existing transaction. A watch set
// can optionally be passed in to add the job histories to the watch set.
func (s *StateStore) jobHistoryByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) {
func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) {
// Get all the historic jobs for this ID
iter, err := txn.Get("job_histories", "id_prefix", id)
iter, err := txn.Get("job_versions", "id_prefix", id)
if err != nil {
return nil, err
}

View File

@@ -421,8 +421,8 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
t.Fatalf("bad")
}
// Check the job history
allVersions, err := state.JobHistoryByID(ws, job.ID)
// Check the job versions
allVersions, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -504,8 +504,8 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
t.Fatalf("nil summary for task group")
}
// Check the job history
allVersions, err := state.JobHistoryByID(ws, job.ID)
// Check the job versions
allVersions, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -658,7 +658,7 @@ func TestStateStore_UpsertJob_ChildJob(t *testing.T) {
}
}
func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) {
func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) {
state := testStateStore(t)
// Create a job and mark it as stable
@@ -668,7 +668,7 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) {
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
_, err := state.JobHistoryByID(ws, job.ID)
_, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
@@ -712,7 +712,7 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) {
t.Fatalf("bad: %#v", out)
}
index, err := state.Index("job_histories")
index, err := state.Index("job_versions")
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -720,12 +720,12 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) {
t.Fatalf("bad: %d", index)
}
// Check the job history
allVersions, err := state.JobHistoryByID(ws, job.ID)
// Check the job versions
allVersions, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(allVersions) != structs.JobDefaultHistoricCount {
if len(allVersions) != structs.JobTrackedVersions {
t.Fatalf("got %d; want 1", len(allVersions))
}
@@ -737,7 +737,7 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) {
}
// Ensure we didn't delete the stable job
if a := allVersions[structs.JobDefaultHistoricCount-1]; a.ID != job.ID ||
if a := allVersions[structs.JobTrackedVersions-1]; a.ID != job.ID ||
a.Version != 0 || a.Priority != 0 || !a.Stable {
t.Fatalf("bad: %+v", a)
}

View File

@@ -1098,9 +1098,9 @@ const (
// for the system to remain healthy.
CoreJobPriority = JobMaxPriority * 2
// JobDefaultHistoricCount is the number of historic job versions that are
// JobTrackedVersions is the number of historic job versions that are
// kept.
JobDefaultHistoricCount = 6
JobTrackedVersions = 6
)
// Job is the scope of a scheduling request to Nomad. It is the largest