JobNs -> NamespacedID

Also drop the New func as it's easy to swap the order of arguments since
they're both strings.
This commit is contained in:
Michael Schurter
2018-03-19 10:12:12 -07:00
parent 8217ebf11e
commit 74dc8fd460
4 changed files with 20 additions and 32 deletions

View File

@@ -125,7 +125,7 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
}
// RunningServices returns the set of jobs on the node
func (n *drainingNode) RunningServices() ([]structs.JobNs, error) {
func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()
@@ -135,14 +135,14 @@ func (n *drainingNode) RunningServices() ([]structs.JobNs, error) {
return nil, err
}
jobIDs := make(map[structs.JobNs]struct{})
var jobs []structs.JobNs
jobIDs := make(map[structs.NamespacedID]struct{})
var jobs []structs.NamespacedID
for _, alloc := range allocs {
if alloc.TerminalStatus() || alloc.Job.Type != structs.JobTypeService {
continue
}
jns := structs.NewJobNs(alloc.Namespace, alloc.JobID)
jns := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[jns]; ok {
continue
}

View File

@@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest {
// DrainingJobWatcher is the interface for watching a job drain
type DrainingJobWatcher interface {
// RegisterJob is used to start watching a draining job
RegisterJobs(job []structs.JobNs)
RegisterJobs(job []structs.NamespacedID)
// Drain is used to emit allocations that should be drained.
Drain() <-chan *DrainRequest
@@ -52,7 +52,7 @@ type drainingJobWatcher struct {
limiter *rate.Limiter
// jobs is the set of tracked jobs.
jobs map[structs.JobNs]struct{}
jobs map[structs.NamespacedID]struct{}
// queryCtx is used to cancel a blocking query.
queryCtx context.Context
@@ -80,7 +80,7 @@ func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *st
limiter: limiter,
logger: logger,
state: state,
jobs: make(map[structs.JobNs]struct{}, 64),
jobs: make(map[structs.NamespacedID]struct{}, 64),
drainCh: make(chan *DrainRequest),
migratedCh: make(chan []*structs.Allocation),
}
@@ -90,7 +90,7 @@ func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *st
}
// RegisterJob marks the given job as draining and adds it to being watched.
func (w *drainingJobWatcher) RegisterJobs(jobs []structs.JobNs) {
func (w *drainingJobWatcher) RegisterJobs(jobs []structs.NamespacedID) {
w.l.Lock()
defer w.l.Unlock()
@@ -129,7 +129,7 @@ func (w *drainingJobWatcher) Migrated() <-chan []*structs.Allocation {
func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) {
w.l.Lock()
defer w.l.Unlock()
jns := structs.JobNs{
jns := structs.NamespacedID{
ID: jobID,
Namespace: namespace,
}
@@ -409,7 +409,7 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
}
// getJobAllocs returns all allocations for draining jobs
func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) (map[structs.JobNs][]*structs.Allocation, uint64, error) {
func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) (map[structs.NamespacedID][]*structs.Allocation, uint64, error) {
if err := w.limiter.Wait(ctx); err != nil {
return nil, 0, err
}
@@ -422,7 +422,7 @@ func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64)
return nil, index, nil
}
return resp.(map[structs.JobNs][]*structs.Allocation), index, nil
return resp.(map[structs.NamespacedID][]*structs.Allocation), index, nil
}
// getJobAllocsImpl returns a map of draining jobs to their allocations.
@@ -440,7 +440,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
}
// Capture the allocs for each draining job.
resp := make(map[structs.JobNs][]*structs.Allocation, l)
resp := make(map[structs.NamespacedID][]*structs.Allocation, l)
for jns := range draining {
allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false)
if err != nil {
@@ -454,7 +454,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
}
// drainingJobs captures the set of draining jobs.
func (w *drainingJobWatcher) drainingJobs() map[structs.JobNs]struct{} {
func (w *drainingJobWatcher) drainingJobs() map[structs.NamespacedID]struct{} {
w.l.RLock()
defer w.l.RUnlock()
@@ -463,7 +463,7 @@ func (w *drainingJobWatcher) drainingJobs() map[structs.JobNs]struct{} {
return nil
}
draining := make(map[structs.JobNs]struct{}, l)
draining := make(map[structs.NamespacedID]struct{}, l)
for k := range w.jobs {
draining[k] = struct{}{}
}

View File

@@ -75,12 +75,12 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
}
// 2 jobs with count 10, max parallel 3
jnss := make([]structs.JobNs, 2)
jnss := make([]structs.NamespacedID, 2)
jobs := make([]*structs.Job, 2)
for i := 0; i < 2; i++ {
job := mock.Job()
jobs[i] = job
jnss[i] = structs.NewJobNs(job.Namespace, job.ID)
jnss[i] = structs.NamespacedID{Namespace: job.Namespace, ID: job.ID}
job.TaskGroups[0].Migrate.MaxParallel = 3
job.TaskGroups[0].Count = count
require.Nil(state.UpsertJob(index, job))

View File

@@ -156,6 +156,10 @@ type NamespacedID struct {
Namespace string
}
func (n NamespacedID) String() string {
return fmt.Sprintf("<ns: %q, id: %q>", n.Namespace, n.ID)
}
// RPCInfo is used to describe common information about query
type RPCInfo interface {
RequestRegion() string
@@ -1792,22 +1796,6 @@ func (n *NetworkResource) PortLabels() map[string]int {
return labelValues
}
// JobNs is a Job.ID and Namespace tuple
type JobNs struct {
Namespace, ID string
}
func NewJobNs(namespace, id string) JobNs {
return JobNs{
Namespace: namespace,
ID: id,
}
}
func (j JobNs) String() string {
return fmt.Sprintf("<ns: %q, id: %q>", j.Namespace, j.ID)
}
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.