diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 9826a1547..47514b023 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -5,8 +5,8 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" ) // Alloc endpoint is used for manipulating allocations @@ -25,18 +25,14 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Table: "allocs"}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Capture all the allocations - snap, err := a.srv.fsm.State().Snapshot() - if err != nil { - return err - } + var err error var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = snap.AllocsByIDPrefix(prefix) + iter, err = state.AllocsByIDPrefix(ws, prefix) } else { - iter, err = snap.Allocs() + iter, err = state.Allocs(ws) } if err != nil { return err @@ -54,7 +50,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes reply.Allocations = allocs // Use the last index that affected the jobs table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } @@ -79,14 +75,9 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Alloc: args.AllocID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Lookup the allocation - snap, err := a.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.AllocByID(args.AllocID) + out, err := state.AllocByID(ws, args.AllocID) if err != nil { return err } @@ -97,7 +88,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, reply.Index = out.ModifyIndex } else { // Use the last index that affected the allocs table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } @@ -119,12 +110,6 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, } defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now()) - // Build the watch - items := make([]watch.Item, 0, len(args.AllocIDs)) - for _, allocID := range args.AllocIDs { - items = append(items, watch.Item{Alloc: allocID}) - } - allocs := make([]*structs.Allocation, len(args.AllocIDs)) // Setup the blocking query. We wait for at least one of the requested @@ -133,18 +118,12 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(items...), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Lookup the allocation - snap, err := a.srv.fsm.State().Snapshot() - if err != nil { - return err - } - thresholdMet := false maxIndex := uint64(0) for i, alloc := range args.AllocIDs { - out, err := snap.AllocByID(alloc) + out, err := state.AllocByID(ws, alloc) if err != nil { return err } @@ -173,7 +152,7 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, reply.Index = maxIndex } else { // Use the last index that affected the nodes table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 8cb170045..e5dbea5a7 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -5,6 +5,7 @@ import ( "math" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -67,7 +68,8 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { // jobGC is used to garbage collect eligible jobs. func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { // Get all the jobs eligible for garbage collection. - iter, err := c.snap.JobsByGC(true) + ws := memdb.NewWatchSet() + iter, err := c.snap.JobsByGC(ws, true) if err != nil { return err } @@ -99,7 +101,8 @@ OUTER: continue } - evals, err := c.snap.EvalsByJob(job.ID) + ws := memdb.NewWatchSet() + evals, err := c.snap.EvalsByJob(ws, job.ID) if err != nil { c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err) continue @@ -163,7 +166,8 @@ OUTER: // evalGC is used to garbage collect old evaluations func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Iterate over the evaluations - iter, err := c.snap.Evals() + ws := memdb.NewWatchSet() + iter, err := c.snap.Evals(ws) if err != nil { return err } @@ -227,6 +231,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return false, nil, nil } + // Create a watchset + ws := memdb.NewWatchSet() + // If the eval is from a running "batch" job we don't want to garbage // collect its allocations. If there is a long running batch job and its // terminal allocations get GC'd the scheduler would re-run the @@ -237,7 +244,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, } // Check if the job is running - job, err := c.snap.JobByID(eval.JobID) + job, err := c.snap.JobByID(ws, eval.JobID) if err != nil { return false, nil, err } @@ -249,7 +256,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, } // Get the allocations by eval - allocs, err := c.snap.AllocsByEval(eval.ID) + allocs, err := c.snap.AllocsByEval(ws, eval.ID) if err != nil { c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v", eval.ID, err) @@ -336,7 +343,8 @@ func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDel // nodeGC is used to garbage collect old nodes func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { // Iterate over the evaluations - iter, err := c.snap.Nodes() + ws := memdb.NewWatchSet() + iter, err := c.snap.Nodes(ws) if err != nil { return err } @@ -374,7 +382,8 @@ OUTER: } // Get the allocations by node - allocs, err := c.snap.AllocsByNode(node.ID) + ws := memdb.NewWatchSet() + allocs, err := c.snap.AllocsByNode(ws, node.ID) if err != nil { c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for node %s: %v", eval.ID, err) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 32ea14faa..6f8f404f0 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -6,8 +6,8 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" "github.com/hashicorp/nomad/scheduler" ) @@ -33,14 +33,9 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Eval: args.EvalID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for the job - snap, err := e.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.EvalByID(args.EvalID) + out, err := state.EvalByID(ws, args.EvalID) if err != nil { return err } @@ -51,7 +46,7 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest, reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table - index, err := snap.Index("evals") + index, err := state.Index("evals") if err != nil { return err } @@ -190,7 +185,9 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest, if err != nil { return err } - out, err := snap.EvalByID(eval.ID) + + ws := memdb.NewWatchSet() + out, err := snap.EvalByID(ws, eval.ID) if err != nil { return err } @@ -233,7 +230,9 @@ func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericRe if err != nil { return err } - out, err := snap.EvalByID(eval.ID) + + ws := memdb.NewWatchSet() + out, err := snap.EvalByID(ws, eval.ID) if err != nil { return err } @@ -280,18 +279,14 @@ func (e *Eval) List(args *structs.EvalListRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Table: "evals"}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Scan all the evaluations - snap, err := e.srv.fsm.State().Snapshot() - if err != nil { - return err - } + var err error var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = snap.EvalsByIDPrefix(prefix) + iter, err = state.EvalsByIDPrefix(ws, prefix) } else { - iter, err = snap.Evals() + iter, err = state.Evals(ws) } if err != nil { return err @@ -309,7 +304,7 @@ func (e *Eval) List(args *structs.EvalListRequest, reply.Evaluations = evals // Use the last index that affected the jobs table - index, err := snap.Index("evals") + index, err := state.Index("evals") if err != nil { return err } @@ -334,14 +329,9 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{AllocEval: args.EvalID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Capture the allocations - snap, err := e.srv.fsm.State().Snapshot() - if err != nil { - return err - } - allocs, err := snap.AllocsByEval(args.EvalID) + allocs, err := state.AllocsByEval(ws, args.EvalID) if err != nil { return err } @@ -355,7 +345,7 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest, } // Use the last index that affected the allocs table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } diff --git a/nomad/fsm.go b/nomad/fsm.go index c3270ff00..edca4de95 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -9,6 +9,7 @@ import ( "time" "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -212,7 +213,8 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { // Unblock evals for the nodes computed node class if it is in a ready // state. if req.Status == structs.NodeStatusReady { - node, err := n.state.NodeByID(req.NodeID) + ws := memdb.NewWatchSet() + node, err := n.state.NodeByID(ws, req.NodeID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", req.NodeID, err) return err @@ -265,13 +267,16 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { return err } + // Create a watch set + ws := memdb.NewWatchSet() + // If it is periodic, record the time it was inserted. This is necessary for // recovering during leader election. It is possible that from the time it // is added to when it was suppose to launch, leader election occurs and the // job was not launched. In this case, we use the insertion time to // determine if a launch was missed. if req.Job.IsPeriodic() { - prevLaunch, err := n.state.PeriodicLaunchByID(req.Job.ID) + prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Job.ID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err) return err @@ -291,7 +296,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // Check if the parent job is periodic and mark the launch time. parentID := req.Job.ParentID if parentID != "" { - parent, err := n.state.JobByID(parentID) + parent, err := n.state.JobByID(ws, parentID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: JobByID(%v) lookup for parent failed: %v", parentID, err) return err @@ -444,9 +449,12 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } + // Create a watch set + ws := memdb.NewWatchSet() + // Updating the allocs with the job id and task group name for _, alloc := range req.Alloc { - if existing, _ := n.state.AllocByID(alloc.ID); existing != nil { + if existing, _ := n.state.AllocByID(ws, alloc.ID); existing != nil { alloc.JobID = existing.JobID alloc.TaskGroup = existing.TaskGroup } @@ -464,7 +472,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} if alloc.ClientStatus == structs.AllocClientStatusComplete || alloc.ClientStatus == structs.AllocClientStatusFailed { nodeID := alloc.NodeID - node, err := n.state.NodeByID(nodeID) + node, err := n.state.NodeByID(ws, nodeID) if err != nil || node == nil { n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", nodeID, err) return err @@ -706,7 +714,8 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // created a Job Summary during the snap shot restore func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { // Get all the jobs - iter, err := n.state.Jobs() + ws := memdb.NewWatchSet() + iter, err := n.state.Jobs(ws) if err != nil { return err } @@ -750,7 +759,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { } // Get the job summary from the fsm state store - originalSummary, err := n.state.JobSummaryByID(job.ID) + originalSummary, err := n.state.JobSummaryByID(ws, job.ID) if err != nil { return err } @@ -886,7 +895,8 @@ func (s *nomadSnapshot) persistIndexes(sink raft.SnapshotSink, func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the nodes - nodes, err := s.snap.Nodes() + ws := memdb.NewWatchSet() + nodes, err := s.snap.Nodes(ws) if err != nil { return err } @@ -913,7 +923,8 @@ func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink, func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the jobs - jobs, err := s.snap.Jobs() + ws := memdb.NewWatchSet() + jobs, err := s.snap.Jobs(ws) if err != nil { return err } @@ -940,7 +951,8 @@ func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink, func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the evaluations - evals, err := s.snap.Evals() + ws := memdb.NewWatchSet() + evals, err := s.snap.Evals(ws) if err != nil { return err } @@ -967,7 +979,8 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink, func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the allocations - allocs, err := s.snap.Allocs() + ws := memdb.NewWatchSet() + allocs, err := s.snap.Allocs(ws) if err != nil { return err } @@ -994,7 +1007,8 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink, func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the jobs - launches, err := s.snap.PeriodicLaunches() + ws := memdb.NewWatchSet() + launches, err := s.snap.PeriodicLaunches(ws) if err != nil { return err } @@ -1021,7 +1035,8 @@ func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink, func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink, encoder *codec.Encoder) error { - summaries, err := s.snap.JobSummaries() + ws := memdb.NewWatchSet() + summaries, err := s.snap.JobSummaries(ws) if err != nil { return err } @@ -1045,7 +1060,8 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink, func (s *nomadSnapshot) persistVaultAccessors(sink raft.SnapshotSink, encoder *codec.Encoder) error { - accessors, err := s.snap.VaultAccessors() + ws := memdb.NewWatchSet() + accessors, err := s.snap.VaultAccessors(ws) if err != nil { return err } diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index 9b2867eca..89bc86010 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -5,6 +5,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -19,7 +20,8 @@ func (s *Server) initializeHeartbeatTimers() error { } // Get an iterator over nodes - iter, err := snap.Nodes() + ws := memdb.NewWatchSet() + iter, err := snap.Nodes(ws) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 992a06538..ce25d1ab2 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -13,8 +13,8 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" "github.com/hashicorp/nomad/scheduler" ) @@ -72,7 +72,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis if err != nil { return err } - job, err := snap.JobByID(args.Job.ID) + ws := memdb.NewWatchSet() + job, err := snap.JobByID(ws, args.Job.ID) if err != nil { return err } @@ -257,15 +258,9 @@ func (j *Job) Summary(args *structs.JobSummaryRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{JobSummary: args.JobID}), - run: func() error { - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for job summary - out, err := snap.JobSummaryByID(args.JobID) + out, err := state.JobSummaryByID(ws, args.JobID) if err != nil { return err } @@ -276,7 +271,7 @@ func (j *Job) Summary(args *structs.JobSummaryRequest, reply.Index = out.ModifyIndex } else { // Use the last index that affected the job_summary table - index, err := snap.Index("job_summary") + index, err := state.Index("job_summary") if err != nil { return err } @@ -307,7 +302,8 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis if err != nil { return err } - job, err := snap.JobByID(args.JobID) + ws := memdb.NewWatchSet() + job, err := snap.JobByID(ws, args.JobID) if err != nil { return err } @@ -368,7 +364,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD if err != nil { return err } - job, err := snap.JobByID(args.JobID) + ws := memdb.NewWatchSet() + job, err := snap.JobByID(ws, args.JobID) if err != nil { return err } @@ -432,15 +429,9 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Job: args.JobID}), - run: func() error { - + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for the job - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.JobByID(args.JobID) + out, err := state.JobByID(ws, args.JobID) if err != nil { return err } @@ -451,7 +442,7 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest, reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table - index, err := snap.Index("jobs") + index, err := state.Index("jobs") if err != nil { return err } @@ -477,18 +468,14 @@ func (j *Job) List(args *structs.JobListRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Table: "jobs"}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Capture all the jobs - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } + var err error var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = snap.JobsByIDPrefix(prefix) + iter, err = state.JobsByIDPrefix(ws, prefix) } else { - iter, err = snap.Jobs() + iter, err = state.Jobs(ws) } if err != nil { return err @@ -501,7 +488,7 @@ func (j *Job) List(args *structs.JobListRequest, break } job := raw.(*structs.Job) - summary, err := snap.JobSummaryByID(job.ID) + summary, err := state.JobSummaryByID(ws, job.ID) if err != nil { return fmt.Errorf("unable to look up summary for job: %v", job.ID) } @@ -510,7 +497,7 @@ func (j *Job) List(args *structs.JobListRequest, reply.Jobs = jobs // Use the last index that affected the jobs table - index, err := snap.Index("jobs") + index, err := state.Index("jobs") if err != nil { return err } @@ -535,14 +522,9 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{AllocJob: args.JobID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Capture the allocations - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - allocs, err := snap.AllocsByJob(args.JobID, args.AllAllocs) + allocs, err := state.AllocsByJob(ws, args.JobID, args.AllAllocs) if err != nil { return err } @@ -556,7 +538,7 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest, } // Use the last index that affected the allocs table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } @@ -582,21 +564,16 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{EvalJob: args.JobID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Capture the evals - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - - reply.Evaluations, err = snap.EvalsByJob(args.JobID) + var err error + reply.Evaluations, err = state.EvalsByJob(ws, args.JobID) if err != nil { return err } // Use the last index that affected the evals table - index, err := snap.Index("evals") + index, err := state.Index("evals") if err != nil { return err } @@ -641,7 +618,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } // Get the original job - oldJob, err := snap.JobByID(args.Job.ID) + ws := memdb.NewWatchSet() + oldJob, err := snap.JobByID(ws, args.Job.ID) if err != nil { return err } @@ -797,7 +775,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa if err != nil { return err } - parameterizedJob, err := snap.JobByID(args.JobID) + ws := memdb.NewWatchSet() + parameterizedJob, err := snap.JobByID(ws, args.JobID) if err != nil { return err } diff --git a/nomad/leader.go b/nomad/leader.go index 3307608e6..dc9cd4231 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -7,6 +7,7 @@ import ( "time" "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -191,7 +192,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // a leadership transition takes place. func (s *Server) restoreEvals() error { // Get an iterator over every evaluation - iter, err := s.fsm.State().Evals() + ws := memdb.NewWatchSet() + iter, err := s.fsm.State().Evals(ws) if err != nil { return fmt.Errorf("failed to get evaluations: %v", err) } @@ -216,8 +218,9 @@ func (s *Server) restoreEvals() error { // revoked. func (s *Server) restoreRevokingAccessors() error { // An accessor should be revoked if its allocation or node is terminal + ws := memdb.NewWatchSet() state := s.fsm.State() - iter, err := state.VaultAccessors() + iter, err := state.VaultAccessors(ws) if err != nil { return fmt.Errorf("failed to get vault accessors: %v", err) } @@ -232,7 +235,7 @@ func (s *Server) restoreRevokingAccessors() error { va := raw.(*structs.VaultAccessor) // Check the allocation - alloc, err := state.AllocByID(va.AllocID) + alloc, err := state.AllocByID(ws, va.AllocID) if err != nil { return fmt.Errorf("failed to lookup allocation: %v", va.AllocID, err) } @@ -243,7 +246,7 @@ func (s *Server) restoreRevokingAccessors() error { } // Check the node - node, err := state.NodeByID(va.NodeID) + node, err := state.NodeByID(ws, va.NodeID) if err != nil { return fmt.Errorf("failed to lookup node %q: %v", va.NodeID, err) } @@ -269,7 +272,8 @@ func (s *Server) restoreRevokingAccessors() error { // dispatcher is maintained only by the leader, so it must be restored anytime a // leadership transition takes place. func (s *Server) restorePeriodicDispatcher() error { - iter, err := s.fsm.State().JobsByPeriodic(true) + ws := memdb.NewWatchSet() + iter, err := s.fsm.State().JobsByPeriodic(ws, true) if err != nil { return fmt.Errorf("failed to get periodic jobs: %v", err) } @@ -282,7 +286,7 @@ func (s *Server) restorePeriodicDispatcher() error { // If the periodic job has never been launched before, launch will hold // the time the periodic job was added. Otherwise it has the last launch // time of the periodic job. - launch, err := s.fsm.State().PeriodicLaunchByID(job.ID) + launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.ID) if err != nil || launch == nil { return fmt.Errorf("failed to get periodic launch time: %v", err) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index e69b85ffc..595da1609 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" "github.com/hashicorp/raft" vapi "github.com/hashicorp/vault/api" ) @@ -103,7 +102,9 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp if err != nil { return err } - originalNode, err := snap.NodeByID(args.Node.ID) + + ws := memdb.NewWatchSet() + originalNode, err := snap.NodeByID(ws, args.Node.ID) if err != nil { return err } @@ -203,7 +204,8 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply // Snapshot is used only to iterate over all nodes to create a node // count to send back to Nomad Clients in their heartbeat so Clients // can estimate the size of the cluster. - iter, err := snap.Nodes() + ws := memdb.NewWatchSet() + iter, err := snap.Nodes(ws) if err == nil { for { raw := iter.Next() @@ -248,7 +250,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } // Determine if there are any Vault accessors on the node - accessors, err := n.srv.State().VaultAccessorsByNode(args.NodeID) + ws := memdb.NewWatchSet() + accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err) return err @@ -289,7 +292,9 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct if err != nil { return err } - node, err := snap.NodeByID(args.NodeID) + + ws := memdb.NewWatchSet() + node, err := snap.NodeByID(ws, args.NodeID) if err != nil { return err } @@ -330,7 +335,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct switch args.Status { case structs.NodeStatusDown: // Determine if there are any Vault accessors on the node - accessors, err := n.srv.State().VaultAccessorsByNode(args.NodeID) + accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err) return err @@ -389,7 +394,8 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, if err != nil { return err } - node, err := snap.NodeByID(args.NodeID) + ws := memdb.NewWatchSet() + node, err := snap.NodeByID(ws, args.NodeID) if err != nil { return err } @@ -443,7 +449,8 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp if err != nil { return err } - node, err := snap.NodeByID(args.NodeID) + ws := memdb.NewWatchSet() + node, err := snap.NodeByID(ws, args.NodeID) if err != nil { return err } @@ -484,19 +491,14 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Node: args.NodeID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID") } // Look for the node - snap, err := n.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.NodeByID(args.NodeID) + out, err := state.NodeByID(ws, args.NodeID) if err != nil { return err } @@ -509,7 +511,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest, reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table - index, err := snap.Index("nodes") + index, err := state.Index("nodes") if err != nil { return err } @@ -541,14 +543,9 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for the node - snap, err := n.srv.fsm.State().Snapshot() - if err != nil { - return err - } - allocs, err := snap.AllocsByNode(args.NodeID) + allocs, err := state.AllocsByNode(ws, args.NodeID) if err != nil { return err } @@ -563,7 +560,7 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, reply.Allocs = nil // Use the last index that affected the nodes table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } @@ -599,16 +596,9 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for the node - snap, err := n.srv.fsm.State().Snapshot() - if err != nil { - return err - } - - // Look for the node - node, err := snap.NodeByID(args.NodeID) + node, err := state.NodeByID(ws, args.NodeID) if err != nil { return err } @@ -628,7 +618,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, } var err error - allocs, err = snap.AllocsByNode(args.NodeID) + allocs, err = state.AllocsByNode(ws, args.NodeID) if err != nil { return err } @@ -643,7 +633,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, } } else { // Use the last index that affected the nodes table - index, err := snap.Index("allocs") + index, err := state.Index("allocs") if err != nil { return err } @@ -734,7 +724,8 @@ func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) { } // Determine if there are any Vault accessors for the allocation - accessors, err := n.srv.State().VaultAccessorsByAlloc(alloc.ID) + ws := memdb.NewWatchSet() + accessors, err := n.srv.State().VaultAccessorsByAlloc(ws, alloc.ID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for alloc %q failed: %v", alloc.ID, err) mErr.Errors = append(mErr.Errors, err) @@ -766,18 +757,14 @@ func (n *Node) List(args *structs.NodeListRequest, opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{Table: "nodes"}), - run: func() error { + run: func(ws memdb.WatchSet, state *state.StateStore) error { // Capture all the nodes - snap, err := n.srv.fsm.State().Snapshot() - if err != nil { - return err - } + var err error var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = snap.NodesByIDPrefix(prefix) + iter, err = state.NodesByIDPrefix(ws, prefix) } else { - iter, err = snap.Nodes() + iter, err = state.Nodes(ws) } if err != nil { return err @@ -795,7 +782,7 @@ func (n *Node) List(args *structs.NodeListRequest, reply.Nodes = nodes // Use the last index that affected the jobs table - index, err := snap.Index("nodes") + index, err := state.Index("nodes") if err != nil { return err } @@ -818,12 +805,13 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6 } // Find all the allocations for this node - allocs, err := snap.AllocsByNode(nodeID) + ws := memdb.NewWatchSet() + allocs, err := snap.AllocsByNode(ws, nodeID) if err != nil { return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err) } - sysJobsIter, err := snap.JobsByScheduler("system") + sysJobsIter, err := snap.JobsByScheduler(ws, "system") if err != nil { return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err) } @@ -985,7 +973,8 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, setErr(err, false) return nil } - node, err := snap.NodeByID(args.NodeID) + ws := memdb.NewWatchSet() + node, err := snap.NodeByID(ws, args.NodeID) if err != nil { setErr(err, false) return nil @@ -999,7 +988,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, return nil } - alloc, err := snap.AllocByID(args.AllocID) + alloc, err := snap.AllocByID(ws, args.AllocID) if err != nil { setErr(err, false) return nil diff --git a/nomad/periodic.go b/nomad/periodic.go index 09dadfdff..b06267585 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -9,6 +9,7 @@ import ( "sync" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -86,8 +87,9 @@ func (s *Server) RunningChildren(job *structs.Job) (bool, error) { return false, err } + ws := memdb.NewWatchSet() prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix) - iter, err := state.JobsByIDPrefix(prefix) + iter, err := state.JobsByIDPrefix(ws, prefix) if err != nil { return false, err } @@ -102,7 +104,7 @@ func (s *Server) RunningChildren(job *structs.Job) (bool, error) { } // Get the childs evaluations. - evals, err := state.EvalsByJob(child.ID) + evals, err := state.EvalsByJob(ws, child.ID) if err != nil { return false, err } @@ -113,7 +115,7 @@ func (s *Server) RunningChildren(job *structs.Job) (bool, error) { return true, nil } - allocs, err := state.AllocsByEval(eval.ID) + allocs, err := state.AllocsByEval(ws, eval.ID) if err != nil { return false, err } diff --git a/nomad/periodic_endpoint.go b/nomad/periodic_endpoint.go index f8de4ae00..b172c4658 100644 --- a/nomad/periodic_endpoint.go +++ b/nomad/periodic_endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -30,7 +31,9 @@ func (p *Periodic) Force(args *structs.PeriodicForceRequest, reply *structs.Peri if err != nil { return err } - job, err := snap.JobByID(args.JobID) + + ws := memdb.NewWatchSet() + job, err := snap.JobByID(ws, args.JobID) if err != nil { return err } diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index c094f16e8..5262eb94e 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -6,6 +6,7 @@ import ( "time" "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -322,7 +323,8 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri } // Get the node itself - node, err := snap.NodeByID(nodeID) + ws := memdb.NewWatchSet() + node, err := snap.NodeByID(ws, nodeID) if err != nil { return false, fmt.Errorf("failed to get node '%s': %v", nodeID, err) } @@ -335,7 +337,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri } // Get the existing allocations that are non-terminal - existingAlloc, err := snap.AllocsByNodeTerminal(nodeID, false) + existingAlloc, err := snap.AllocsByNodeTerminal(ws, nodeID, false) if err != nil { return false, fmt.Errorf("failed to get existing allocations for '%s': %v", nodeID, err) } diff --git a/nomad/rpc.go b/nomad/rpc.go index 1357566ff..39e6eea8f 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -16,7 +16,6 @@ import ( "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" "github.com/hashicorp/raft" "github.com/hashicorp/yamux" ) @@ -333,7 +332,6 @@ type queryFn func(memdb.WatchSet, *state.StateStore) error type blockingOptions struct { queryOpts *structs.QueryOptions queryMeta *structs.QueryMeta - watch watch.Items run queryFn } @@ -376,7 +374,7 @@ RUN_QUERY: // We can skip all watch tracking if this isn't a blocking query. var ws memdb.WatchSet - if queryOpts.MinQueryIndex > 0 { + if opts.queryOpts.MinQueryIndex > 0 { ws = memdb.NewWatchSet() // This channel will be closed if a snapshot is restored and the @@ -385,12 +383,11 @@ RUN_QUERY: } // Block up to the timeout if we didn't see anything fresh. - err := fn(ws, state) + err := opts.run(ws, state) // Check for minimum query time if err == nil && opts.queryOpts.MinQueryIndex > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { if expired := ws.Watch(timeout.C); !expired { - // XXX James can do this behavior too goto RUN_QUERY } } diff --git a/nomad/worker.go b/nomad/worker.go index f4ef1fb5c..6a274bf12 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -8,6 +8,7 @@ import ( "time" "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" ) @@ -446,7 +447,8 @@ func (w *Worker) ReblockEval(eval *structs.Evaluation) error { // Update the evaluation if the queued jobs is not same as what is // recorded in the job summary - summary, err := w.srv.fsm.state.JobSummaryByID(eval.JobID) + ws := memdb.NewWatchSet() + summary, err := w.srv.fsm.state.JobSummaryByID(ws, eval.JobID) if err != nil { return fmt.Errorf("couldn't retreive job summary: %v", err) } diff --git a/scheduler/context.go b/scheduler/context.go index 5f3366f46..0e9d483c8 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -4,6 +4,7 @@ import ( "log" "regexp" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" ) @@ -107,7 +108,8 @@ func (e *EvalContext) Reset() { func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, error) { // Get the existing allocations that are non-terminal - existingAlloc, err := e.state.AllocsByNodeTerminal(nodeID, false) + ws := memdb.NewWatchSet() + existingAlloc, err := e.state.AllocsByNodeTerminal(ws, nodeID, false) if err != nil { return nil, err } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 7c2cc24bb..5653d9537 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -4,6 +4,7 @@ import ( "fmt" "log" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" ) @@ -183,7 +184,8 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error { func (s *GenericScheduler) process() (bool, error) { // Lookup the Job by ID var err error - s.job, err = s.state.JobByID(s.eval.JobID) + ws := memdb.NewWatchSet() + s.job, err = s.state.JobByID(ws, s.eval.JobID) if err != nil { return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) @@ -354,7 +356,8 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Lookup the allocations by JobID - allocs, err := s.state.AllocsByJob(s.eval.JobID, true) + ws := memdb.NewWatchSet() + allocs, err := s.state.AllocsByJob(ws, s.eval.JobID, true) if err != nil { return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) @@ -513,7 +516,8 @@ func (s *GenericScheduler) findPreferredNode(allocTuple *allocTuple) (node *stru } if taskGroup.EphemeralDisk.Sticky == true { var preferredNode *structs.Node - preferredNode, err = s.state.NodeByID(allocTuple.Alloc.NodeID) + ws := memdb.NewWatchSet() + preferredNode, err = s.state.NodeByID(ws, allocTuple.Alloc.NodeID) if preferredNode.Ready() { node = preferredNode } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index c69a5984e..ddbf855c4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -63,22 +63,22 @@ type Scheduler interface { type State interface { // Nodes returns an iterator over all the nodes. // The type of each result is *structs.Node - Nodes() (memdb.ResultIterator, error) + Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) // AllocsByJob returns the allocations by JobID - AllocsByJob(jobID string, all bool) ([]*structs.Allocation, error) + AllocsByJob(ws memdb.WatchSet, jobID string, all bool) ([]*structs.Allocation, error) // AllocsByNode returns all the allocations by node - AllocsByNode(node string) ([]*structs.Allocation, error) + AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) // AllocsByNodeTerminal returns all the allocations by node filtering by terminal status - AllocsByNodeTerminal(node string, terminal bool) ([]*structs.Allocation, error) + AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) // GetNodeByID is used to lookup a node by ID - NodeByID(nodeID string) (*structs.Node, error) + NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) // GetJobByID is used to lookup a job by ID - JobByID(id string) (*structs.Job, error) + JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) } // Planner interface is used to submit a task allocation plan. diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index f68de6b8f..755153d9c 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -4,6 +4,7 @@ import ( "fmt" "log" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -87,7 +88,8 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { func (s *SystemScheduler) process() (bool, error) { // Lookup the Job by ID var err error - s.job, err = s.state.JobByID(s.eval.JobID) + ws := memdb.NewWatchSet() + s.job, err = s.state.JobByID(ws, s.eval.JobID) if err != nil { return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) @@ -178,7 +180,8 @@ func (s *SystemScheduler) process() (bool, error) { // existing allocations and node status to update the allocations. func (s *SystemScheduler) computeJobAllocs() error { // Lookup the allocations by JobID - allocs, err := s.state.AllocsByJob(s.eval.JobID, true) + ws := memdb.NewWatchSet() + allocs, err := s.state.AllocsByJob(ws, s.eval.JobID, true) if err != nil { return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) diff --git a/scheduler/testing.go b/scheduler/testing.go index 08254eae3..74c01c486 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -7,6 +7,7 @@ import ( "sync" "testing" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -159,7 +160,8 @@ func (h *Harness) ReblockEval(eval *structs.Evaluation) error { defer h.planLock.Unlock() // Check that the evaluation was already blocked. - old, err := h.State.EvalByID(eval.ID) + ws := memdb.NewWatchSet() + old, err := h.State.EvalByID(ws, eval.ID) if err != nil { return err } diff --git a/scheduler/util.go b/scheduler/util.go index 1ed306b76..f305134cd 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -6,6 +6,7 @@ import ( "math/rand" "reflect" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -228,8 +229,9 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int } // Scan the nodes + ws := memdb.NewWatchSet() var out []*structs.Node - iter, err := state.Nodes() + iter, err := state.Nodes(ws) if err != nil { return nil, nil, err } @@ -301,7 +303,8 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct continue } - node, err := state.NodeByID(alloc.NodeID) + ws := memdb.NewWatchSet() + node, err := state.NodeByID(ws, alloc.NodeID) if err != nil { return nil, err } @@ -452,6 +455,7 @@ func setStatus(logger *log.Logger, planner Planner, func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, stack Stack, updates []allocTuple) (destructive, inplace []allocTuple) { + ws := memdb.NewWatchSet() n := len(updates) inplaceCount := 0 for i := 0; i < n; i++ { @@ -471,7 +475,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, } // Get the existing node - node, err := ctx.State().NodeByID(update.Alloc.NodeID) + node, err := ctx.State().NodeByID(ws, update.Alloc.NodeID) if err != nil { ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", eval, update.Alloc.NodeID, err)