From 907f234a04a7f54f4a85f4ebf5b4accaef995cdb Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 18 Sep 2017 12:47:32 -0700 Subject: [PATCH] Fix upgrading from 0.6.x to 0.7.0 --- nomad/state/state_store.go | 180 +++++++++++++++++++++++++++++-------- 1 file changed, 141 insertions(+), 39 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e7b9fe852..f01df88d5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -197,9 +197,9 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma txn := s.db.Txn(true) defer txn.Abort() - // TODO(alex): Remove before releasing + // COMPAT 0.7: Upgrade old objects that do not have namespaces if jobSummary.Namespace == "" { - panic("empty namespace") + jobSummary.Namespace = structs.DefaultNamespace } // Check if the job summary already exists @@ -237,8 +237,9 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error txn := s.db.Txn(true) defer txn.Abort() + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } // Delete the job summary @@ -271,6 +272,11 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return fmt.Errorf("deployment lookup failed: %v", err) } + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if deployment.Namespace == "" { + deployment.Namespace = structs.DefaultNamespace + } + // Setup the indexes correctly if existing != nil { deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex @@ -377,8 +383,9 @@ func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Deployment, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } // Get an iterator over the deployments @@ -408,8 +415,9 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } // Get an iterator over the deployments @@ -656,8 +664,9 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { // upsertJobImpl is the implementation for registering a job or updating a job definition func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if job.Namespace == "" { - panic("empty namespace") + job.Namespace = structs.DefaultNamespace } // Assert the namespace exists @@ -740,9 +749,11 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { txn := s.db.Txn(true) defer txn.Abort() + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + // Lookup the node existing, err := txn.First("jobs", "id", namespace, jobID) if err != nil { @@ -786,6 +797,11 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { // Update the modify index pSummary.ModifyIndex = index + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if pSummary.Namespace == "" { + pSummary.Namespace = structs.DefaultNamespace + } + // Insert the summary if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) @@ -825,9 +841,11 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { // deleteJobVersions deletes all versions of the given job. func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if job.Namespace == "" { - panic("empty namespace") + job.Namespace = structs.DefaultNamespace } + iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID) if err != nil { return err @@ -860,9 +878,11 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if job.Namespace == "" { - panic("empty namespace") + job.Namespace = structs.DefaultNamespace } + // Insert the job if err := txn.Insert("job_version", job); err != nil { return fmt.Errorf("failed to insert job into job_version table: %v", err) @@ -914,9 +934,11 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + watchCh, existing, err := txn.FirstWatch("jobs", "id", namespace, id) if err != nil { return nil, fmt.Errorf("job lookup failed: %v", err) @@ -933,8 +955,9 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs. func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } iter, err := txn.Get("jobs", "id_prefix", namespace, id) @@ -950,9 +973,12 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me // JobVersionsByID returns all the tracked versions of a job. func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { txn := s.db.Txn(false) + + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + return s.jobVersionByID(txn, &ws, namespace, id) } @@ -960,9 +986,11 @@ func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([ // versions of a job and is called under an existing transaction. A watch set // can optionally be passed in to add the job histories to the watch set. func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + // Get all the historic jobs for this ID iter, err := txn.Get("job_version", "id_prefix", namespace, id) if err != nil { @@ -1000,8 +1028,9 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespac // JobByIDAndVersion returns the job identified by its ID and Version. The // passed watchset may be nil. func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } txn := s.db.Txn(false) return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn) @@ -1011,8 +1040,9 @@ func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, // passed watchset may be nil. func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string, version uint64, txn *memdb.Txn) (*structs.Job, error) { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version) @@ -1128,9 +1158,11 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID) if err != nil { return nil, err @@ -1165,9 +1197,11 @@ func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, erro func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + iter, err := txn.Get("job_summary", "id_prefix", namespace, id) if err != nil { return nil, fmt.Errorf("eval lookup failed: %v", err) @@ -1183,9 +1217,11 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic txn := s.db.Txn(true) defer txn.Abort() + // COMPAT 0.7: Upgrade old objects that do not have namespaces if launch.Namespace == "" { - panic("empty namespace") + launch.Namespace = structs.DefaultNamespace } + // Check if the job already exists existing, err := txn.First("periodic_launch", "id", launch.Namespace, launch.ID) if err != nil { @@ -1218,9 +1254,11 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) txn := s.db.Txn(true) defer txn.Abort() + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + // Lookup the launch existing, err := txn.First("periodic_launch", "id", namespace, jobID) if err != nil { @@ -1247,9 +1285,11 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id) if err != nil { return nil, fmt.Errorf("periodic launch lookup failed: %v", err) @@ -1314,6 +1354,11 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("eval lookup failed: %v", err) } + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if eval.Namespace == "" { + eval.Namespace = structs.DefaultNamespace + } + // Update the indexes if existing != nil { eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex @@ -1323,9 +1368,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct eval.ModifyIndex = index } - if eval.Namespace == "" { - panic("empty namespace") - } // Update the job summary summaryRaw, err := txn.First("job_summary", "id", eval.Namespace, eval.JobID) if err != nil { @@ -1348,6 +1390,11 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct // Insert the job summary if hasSummaryChanged { + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if js.Namespace == "" { + js.Namespace = structs.DefaultNamespace + } + js.ModifyIndex = index if err := txn.Insert("job_summary", js); err != nil { return fmt.Errorf("job summary insert failed: %v", err) @@ -1381,6 +1428,12 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct newEval.Status = structs.EvalStatusCancelled newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID) newEval.ModifyIndex = index + + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if newEval.Namespace == "" { + newEval.Namespace = structs.DefaultNamespace + } + if err := txn.Insert("evals", newEval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } @@ -1483,6 +1536,11 @@ func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (m ws.Add(iter.WatchCh()) + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if namespace == "" { + namespace = structs.DefaultNamespace + } + // Wrap the iterator in a filter wrap := memdb.NewFilterIterator(iter, evalNamespaceFilter(namespace)) return wrap, nil @@ -1505,9 +1563,11 @@ func evalNamespaceFilter(namespace string) func(interface{}) bool { func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + // Get an iterator over the node allocations iter, err := txn.Get("evals", "job_prefix", namespace, jobID) if err != nil { @@ -1608,6 +1668,11 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a // Copy everything from the existing allocation copyAlloc := exist.Copy() + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if copyAlloc.Namespace == "" { + copyAlloc.Namespace = structs.DefaultNamespace + } + // Pull in anything the client is the authority on copyAlloc.ClientStatus = alloc.ClientStatus copyAlloc.ClientDescription = alloc.ClientDescription @@ -1712,6 +1777,11 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation } } + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if alloc.Namespace == "" { + alloc.Namespace = structs.DefaultNamespace + } + if err := s.updateDeploymentWithAlloc(index, alloc, exist, txn); err != nil { return fmt.Errorf("error updating deployment: %v", err) } @@ -1853,9 +1923,11 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) { txn := s.db.Txn(false) + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + // Get the job var job *structs.Job rawJob, err := txn.First("jobs", "id", namespace, jobID) @@ -2139,6 +2211,11 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym copy.StatusDescription = u.StatusDescription copy.ModifyIndex = index + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if copy.Namespace == "" { + copy.Namespace = structs.DefaultNamespace + } + // Insert the deployment if err := txn.Insert("deployment", copy); err != nil { return err @@ -2148,9 +2225,6 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - if copy.Namespace == "" { - panic("empty namespace") - } // If the deployment is being marked as complete, set the job to stable. if copy.Status == structs.DeploymentStatusSuccessful { @@ -2167,8 +2241,10 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error { txn := s.db.Txn(true) defer txn.Abort() + + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil { @@ -2181,9 +2257,11 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j // updateJobStabilityImpl updates the stability of the given job and version func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { - panic("empty namespace") + namespace = structs.DefaultNamespace } + // Get the job that is referenced job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn) if err != nil { @@ -2497,8 +2575,10 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { for _, tg := range job.TaskGroups { summary.Summary[tg.Name] = structs.TaskGroupSummary{} } + + // COMPAT 0.7: Upgrade old objects that do not have namespaces if job.Namespace == "" { - panic("empty namespace") + job.Namespace = structs.DefaultNamespace } // Find all the allocations for the jobs @@ -2564,9 +2644,11 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, jobs map[structs.NamespacedID]string, evalDelete bool) error { for tuple, forceStatus := range jobs { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if tuple.Namespace == "" { - panic("empty namespace") + tuple.Namespace = structs.DefaultNamespace } + existing, err := txn.First("jobs", "id", tuple.Namespace, tuple.ID) if err != nil { return fmt.Errorf("job lookup failed: %v", err) @@ -2618,6 +2700,11 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, updated.Status = newStatus updated.ModifyIndex = index + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if updated.Namespace == "" { + updated.Namespace = structs.DefaultNamespace + } + // Insert the job if err := txn.Insert("jobs", updated); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -2626,10 +2713,6 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, return fmt.Errorf("index update failed: %v", err) } - if updated.Namespace == "" { - panic("empty namespace") - } - // Update the children summary if updated.ParentID != "" { // Try to update the summary of the parent job summary @@ -2647,6 +2730,11 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, pSummary.Children = new(structs.JobChildrenSummary) } + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if pSummary.Namespace == "" { + pSummary.Namespace = structs.DefaultNamespace + } + // Determine the transition and update the correct fields children := pSummary.Children @@ -2693,9 +2781,11 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, } func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if job.Namespace == "" { - panic("empty namespace") + job.Namespace = structs.DefaultNamespace } + allocs, err := txn.Get("allocs", "job", job.Namespace, job.ID) if err != nil { return "", err @@ -2765,8 +2855,9 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, txn *memdb.Txn) error { + // COMPAT 0.7: Upgrade old objects that do not have namespaces if job.Namespace == "" { - panic("empty namespace") + job.Namespace = structs.DefaultNamespace } // Update the job summary @@ -2808,6 +2899,11 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, if hasSummaryChanged { summary.ModifyIndex = index + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if summary.Namespace == "" { + summary.Namespace = structs.DefaultNamespace + } + // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) @@ -2908,8 +3004,9 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if alloc.Job == nil { return nil } + // COMPAT 0.7: Upgrade old objects that do not have namespaces if alloc.Namespace == "" { - panic("empty namespace") + alloc.Namespace = structs.DefaultNamespace } summaryRaw, err := txn.First("job_summary", "id", alloc.Namespace, alloc.JobID) @@ -3000,6 +3097,11 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if summaryChanged { jobSummary.ModifyIndex = index + // COMPAT 0.7: Upgrade old objects that do not have namespaces + if jobSummary.Namespace == "" { + jobSummary.Namespace = structs.DefaultNamespace + } + // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err)