From 74dc8fd46076dbc6e351b6fb472a481289e1da11 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 19 Mar 2018 10:12:12 -0700 Subject: [PATCH] JobNs -> NamespacedID Also drop the New func as it's easy to swap the order of arguments since they're both strings. --- nomad/drainer/draining_node.go | 8 ++++---- nomad/drainer/watch_jobs.go | 20 ++++++++++---------- nomad/drainer/watch_jobs_test.go | 4 ++-- nomad/structs/structs.go | 20 ++++---------------- 4 files changed, 20 insertions(+), 32 deletions(-) diff --git a/nomad/drainer/draining_node.go b/nomad/drainer/draining_node.go index 078399f04..af5c094b8 100644 --- a/nomad/drainer/draining_node.go +++ b/nomad/drainer/draining_node.go @@ -125,7 +125,7 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { } // RunningServices returns the set of jobs on the node -func (n *drainingNode) RunningServices() ([]structs.JobNs, error) { +func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) { n.l.RLock() defer n.l.RUnlock() @@ -135,14 +135,14 @@ func (n *drainingNode) RunningServices() ([]structs.JobNs, error) { return nil, err } - jobIDs := make(map[structs.JobNs]struct{}) - var jobs []structs.JobNs + jobIDs := make(map[structs.NamespacedID]struct{}) + var jobs []structs.NamespacedID for _, alloc := range allocs { if alloc.TerminalStatus() || alloc.Job.Type != structs.JobTypeService { continue } - jns := structs.NewJobNs(alloc.Namespace, alloc.JobID) + jns := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} if _, ok := jobIDs[jns]; ok { continue } diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 61a615646..181871b20 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest { // DrainingJobWatcher is the interface for watching a job drain type DrainingJobWatcher interface { // RegisterJob is used to start watching a draining job - RegisterJobs(job []structs.JobNs) + RegisterJobs(job []structs.NamespacedID) // Drain is used to emit allocations that should be drained. Drain() <-chan *DrainRequest @@ -52,7 +52,7 @@ type drainingJobWatcher struct { limiter *rate.Limiter // jobs is the set of tracked jobs. - jobs map[structs.JobNs]struct{} + jobs map[structs.NamespacedID]struct{} // queryCtx is used to cancel a blocking query. queryCtx context.Context @@ -80,7 +80,7 @@ func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *st limiter: limiter, logger: logger, state: state, - jobs: make(map[structs.JobNs]struct{}, 64), + jobs: make(map[structs.NamespacedID]struct{}, 64), drainCh: make(chan *DrainRequest), migratedCh: make(chan []*structs.Allocation), } @@ -90,7 +90,7 @@ func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *st } // RegisterJob marks the given job as draining and adds it to being watched. -func (w *drainingJobWatcher) RegisterJobs(jobs []structs.JobNs) { +func (w *drainingJobWatcher) RegisterJobs(jobs []structs.NamespacedID) { w.l.Lock() defer w.l.Unlock() @@ -129,7 +129,7 @@ func (w *drainingJobWatcher) Migrated() <-chan []*structs.Allocation { func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) { w.l.Lock() defer w.l.Unlock() - jns := structs.JobNs{ + jns := structs.NamespacedID{ ID: jobID, Namespace: namespace, } @@ -409,7 +409,7 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, } // getJobAllocs returns all allocations for draining jobs -func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) (map[structs.JobNs][]*structs.Allocation, uint64, error) { +func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) (map[structs.NamespacedID][]*structs.Allocation, uint64, error) { if err := w.limiter.Wait(ctx); err != nil { return nil, 0, err } @@ -422,7 +422,7 @@ func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) return nil, index, nil } - return resp.(map[structs.JobNs][]*structs.Allocation), index, nil + return resp.(map[structs.NamespacedID][]*structs.Allocation), index, nil } // getJobAllocsImpl returns a map of draining jobs to their allocations. @@ -440,7 +440,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St } // Capture the allocs for each draining job. - resp := make(map[structs.JobNs][]*structs.Allocation, l) + resp := make(map[structs.NamespacedID][]*structs.Allocation, l) for jns := range draining { allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false) if err != nil { @@ -454,7 +454,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St } // drainingJobs captures the set of draining jobs. -func (w *drainingJobWatcher) drainingJobs() map[structs.JobNs]struct{} { +func (w *drainingJobWatcher) drainingJobs() map[structs.NamespacedID]struct{} { w.l.RLock() defer w.l.RUnlock() @@ -463,7 +463,7 @@ func (w *drainingJobWatcher) drainingJobs() map[structs.JobNs]struct{} { return nil } - draining := make(map[structs.JobNs]struct{}, l) + draining := make(map[structs.NamespacedID]struct{}, l) for k := range w.jobs { draining[k] = struct{}{} } diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index 078e5316e..399ee46a1 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -75,12 +75,12 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { } // 2 jobs with count 10, max parallel 3 - jnss := make([]structs.JobNs, 2) + jnss := make([]structs.NamespacedID, 2) jobs := make([]*structs.Job, 2) for i := 0; i < 2; i++ { job := mock.Job() jobs[i] = job - jnss[i] = structs.NewJobNs(job.Namespace, job.ID) + jnss[i] = structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} job.TaskGroups[0].Migrate.MaxParallel = 3 job.TaskGroups[0].Count = count require.Nil(state.UpsertJob(index, job)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fa16284ab..307cce7fa 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -156,6 +156,10 @@ type NamespacedID struct { Namespace string } +func (n NamespacedID) String() string { + return fmt.Sprintf("", n.Namespace, n.ID) +} + // RPCInfo is used to describe common information about query type RPCInfo interface { RequestRegion() string @@ -1792,22 +1796,6 @@ func (n *NetworkResource) PortLabels() map[string]int { return labelValues } -// JobNs is a Job.ID and Namespace tuple -type JobNs struct { - Namespace, ID string -} - -func NewJobNs(namespace, id string) JobNs { - return JobNs{ - Namespace: namespace, - ID: id, - } -} - -func (j JobNs) String() string { - return fmt.Sprintf("", j.Namespace, j.ID) -} - const ( // JobTypeNomad is reserved for internal system tasks and is // always handled by the CoreScheduler.