Fix upgrading from 0.6.x to 0.7.0

This commit is contained in:
Alex Dadgar
2017-09-18 12:47:32 -07:00
parent 66776b7c83
commit 907f234a04

View File

@@ -197,9 +197,9 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma
txn := s.db.Txn(true)
defer txn.Abort()
// TODO(alex): Remove before releasing
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if jobSummary.Namespace == "" {
panic("empty namespace")
jobSummary.Namespace = structs.DefaultNamespace
}
// Check if the job summary already exists
@@ -237,8 +237,9 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error
txn := s.db.Txn(true)
defer txn.Abort()
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Delete the job summary
@@ -271,6 +272,11 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
return fmt.Errorf("deployment lookup failed: %v", err)
}
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if deployment.Namespace == "" {
deployment.Namespace = structs.DefaultNamespace
}
// Setup the indexes correctly
if existing != nil {
deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex
@@ -377,8 +383,9 @@ func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string,
func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Deployment, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Get an iterator over the deployments
@@ -408,8 +415,9 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri
func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Get an iterator over the deployments
@@ -656,8 +664,9 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
// upsertJobImpl is the implementation for registering a job or updating a job definition
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if job.Namespace == "" {
panic("empty namespace")
job.Namespace = structs.DefaultNamespace
}
// Assert the namespace exists
@@ -740,9 +749,11 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Lookup the node
existing, err := txn.First("jobs", "id", namespace, jobID)
if err != nil {
@@ -786,6 +797,11 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
// Update the modify index
pSummary.ModifyIndex = index
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if pSummary.Namespace == "" {
pSummary.Namespace = structs.DefaultNamespace
}
// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
@@ -825,9 +841,11 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
// deleteJobVersions deletes all versions of the given job.
func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if job.Namespace == "" {
panic("empty namespace")
job.Namespace = structs.DefaultNamespace
}
iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID)
if err != nil {
return err
@@ -860,9 +878,11 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
// upsertJobVersion inserts a job into its historic version table and limits the
// number of job versions that are tracked.
func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if job.Namespace == "" {
panic("empty namespace")
job.Namespace = structs.DefaultNamespace
}
// Insert the job
if err := txn.Insert("job_version", job); err != nil {
return fmt.Errorf("failed to insert job into job_version table: %v", err)
@@ -914,9 +934,11 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb
func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
watchCh, existing, err := txn.FirstWatch("jobs", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("job lookup failed: %v", err)
@@ -933,8 +955,9 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.
func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
iter, err := txn.Get("jobs", "id_prefix", namespace, id)
@@ -950,9 +973,12 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me
// JobVersionsByID returns all the tracked versions of a job.
func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
return s.jobVersionByID(txn, &ws, namespace, id)
}
@@ -960,9 +986,11 @@ func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([
// versions of a job and is called under an existing transaction. A watch set
// can optionally be passed in to add the job histories to the watch set.
func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Get all the historic jobs for this ID
iter, err := txn.Get("job_version", "id_prefix", namespace, id)
if err != nil {
@@ -1000,8 +1028,9 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespac
// JobByIDAndVersion returns the job identified by its ID and Version. The
// passed watchset may be nil.
func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
txn := s.db.Txn(false)
return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn)
@@ -1011,8 +1040,9 @@ func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string,
// passed watchset may be nil.
func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string,
version uint64, txn *memdb.Txn) (*structs.Job, error) {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version)
@@ -1128,9 +1158,11 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator,
func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID)
if err != nil {
return nil, err
@@ -1165,9 +1197,11 @@ func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, erro
func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
iter, err := txn.Get("job_summary", "id_prefix", namespace, id)
if err != nil {
return nil, fmt.Errorf("eval lookup failed: %v", err)
@@ -1183,9 +1217,11 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic
txn := s.db.Txn(true)
defer txn.Abort()
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if launch.Namespace == "" {
panic("empty namespace")
launch.Namespace = structs.DefaultNamespace
}
// Check if the job already exists
existing, err := txn.First("periodic_launch", "id", launch.Namespace, launch.ID)
if err != nil {
@@ -1218,9 +1254,11 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
txn := s.db.Txn(true)
defer txn.Abort()
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Lookup the launch
existing, err := txn.First("periodic_launch", "id", namespace, jobID)
if err != nil {
@@ -1247,9 +1285,11 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("periodic launch lookup failed: %v", err)
@@ -1314,6 +1354,11 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
return fmt.Errorf("eval lookup failed: %v", err)
}
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if eval.Namespace == "" {
eval.Namespace = structs.DefaultNamespace
}
// Update the indexes
if existing != nil {
eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex
@@ -1323,9 +1368,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
eval.ModifyIndex = index
}
if eval.Namespace == "" {
panic("empty namespace")
}
// Update the job summary
summaryRaw, err := txn.First("job_summary", "id", eval.Namespace, eval.JobID)
if err != nil {
@@ -1348,6 +1390,11 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
// Insert the job summary
if hasSummaryChanged {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if js.Namespace == "" {
js.Namespace = structs.DefaultNamespace
}
js.ModifyIndex = index
if err := txn.Insert("job_summary", js); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
@@ -1381,6 +1428,12 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
newEval.ModifyIndex = index
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if newEval.Namespace == "" {
newEval.Namespace = structs.DefaultNamespace
}
if err := txn.Insert("evals", newEval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
@@ -1483,6 +1536,11 @@ func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (m
ws.Add(iter.WatchCh())
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
}
// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, evalNamespaceFilter(namespace))
return wrap, nil
@@ -1505,9 +1563,11 @@ func evalNamespaceFilter(namespace string) func(interface{}) bool {
func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Get an iterator over the node allocations
iter, err := txn.Get("evals", "job_prefix", namespace, jobID)
if err != nil {
@@ -1608,6 +1668,11 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
// Copy everything from the existing allocation
copyAlloc := exist.Copy()
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if copyAlloc.Namespace == "" {
copyAlloc.Namespace = structs.DefaultNamespace
}
// Pull in anything the client is the authority on
copyAlloc.ClientStatus = alloc.ClientStatus
copyAlloc.ClientDescription = alloc.ClientDescription
@@ -1712,6 +1777,11 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
}
}
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if alloc.Namespace == "" {
alloc.Namespace = structs.DefaultNamespace
}
if err := s.updateDeploymentWithAlloc(index, alloc, exist, txn); err != nil {
return fmt.Errorf("error updating deployment: %v", err)
}
@@ -1853,9 +1923,11 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin
func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Get the job
var job *structs.Job
rawJob, err := txn.First("jobs", "id", namespace, jobID)
@@ -2139,6 +2211,11 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
copy.StatusDescription = u.StatusDescription
copy.ModifyIndex = index
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if copy.Namespace == "" {
copy.Namespace = structs.DefaultNamespace
}
// Insert the deployment
if err := txn.Insert("deployment", copy); err != nil {
return err
@@ -2148,9 +2225,6 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if copy.Namespace == "" {
panic("empty namespace")
}
// If the deployment is being marked as complete, set the job to stable.
if copy.Status == structs.DeploymentStatusSuccessful {
@@ -2167,8 +2241,10 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error {
txn := s.db.Txn(true)
defer txn.Abort()
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil {
@@ -2181,9 +2257,11 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j
// updateJobStabilityImpl updates the stability of the given job and version
func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
panic("empty namespace")
namespace = structs.DefaultNamespace
}
// Get the job that is referenced
job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn)
if err != nil {
@@ -2497,8 +2575,10 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
for _, tg := range job.TaskGroups {
summary.Summary[tg.Name] = structs.TaskGroupSummary{}
}
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if job.Namespace == "" {
panic("empty namespace")
job.Namespace = structs.DefaultNamespace
}
// Find all the allocations for the jobs
@@ -2564,9 +2644,11 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn,
jobs map[structs.NamespacedID]string, evalDelete bool) error {
for tuple, forceStatus := range jobs {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if tuple.Namespace == "" {
panic("empty namespace")
tuple.Namespace = structs.DefaultNamespace
}
existing, err := txn.First("jobs", "id", tuple.Namespace, tuple.ID)
if err != nil {
return fmt.Errorf("job lookup failed: %v", err)
@@ -2618,6 +2700,11 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn,
updated.Status = newStatus
updated.ModifyIndex = index
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if updated.Namespace == "" {
updated.Namespace = structs.DefaultNamespace
}
// Insert the job
if err := txn.Insert("jobs", updated); err != nil {
return fmt.Errorf("job insert failed: %v", err)
@@ -2626,10 +2713,6 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn,
return fmt.Errorf("index update failed: %v", err)
}
if updated.Namespace == "" {
panic("empty namespace")
}
// Update the children summary
if updated.ParentID != "" {
// Try to update the summary of the parent job summary
@@ -2647,6 +2730,11 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn,
pSummary.Children = new(structs.JobChildrenSummary)
}
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if pSummary.Namespace == "" {
pSummary.Namespace = structs.DefaultNamespace
}
// Determine the transition and update the correct fields
children := pSummary.Children
@@ -2693,9 +2781,11 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn,
}
func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if job.Namespace == "" {
panic("empty namespace")
job.Namespace = structs.DefaultNamespace
}
allocs, err := txn.Get("allocs", "job", job.Namespace, job.ID)
if err != nil {
return "", err
@@ -2765,8 +2855,9 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if job.Namespace == "" {
panic("empty namespace")
job.Namespace = structs.DefaultNamespace
}
// Update the job summary
@@ -2808,6 +2899,11 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
if hasSummaryChanged {
summary.ModifyIndex = index
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if summary.Namespace == "" {
summary.Namespace = structs.DefaultNamespace
}
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
@@ -2908,8 +3004,9 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if alloc.Job == nil {
return nil
}
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if alloc.Namespace == "" {
panic("empty namespace")
alloc.Namespace = structs.DefaultNamespace
}
summaryRaw, err := txn.First("job_summary", "id", alloc.Namespace, alloc.JobID)
@@ -3000,6 +3097,11 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if summaryChanged {
jobSummary.ModifyIndex = index
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if jobSummary.Namespace == "" {
jobSummary.Namespace = structs.DefaultNamespace
}
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)