Nomad builds

This commit is contained in:
Alex Dadgar
2017-02-07 20:31:23 -08:00
parent ff2d929e2b
commit 5b75b29af4
19 changed files with 206 additions and 217 deletions

View File

@@ -5,8 +5,8 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
// Alloc endpoint is used for manipulating allocations
@@ -25,18 +25,14 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "allocs"}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = snap.AllocsByIDPrefix(prefix)
iter, err = state.AllocsByIDPrefix(ws, prefix)
} else {
iter, err = snap.Allocs()
iter, err = state.Allocs(ws)
}
if err != nil {
return err
@@ -54,7 +50,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
reply.Allocations = allocs
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}
@@ -79,14 +75,9 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Alloc: args.AllocID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.AllocByID(args.AllocID)
out, err := state.AllocByID(ws, args.AllocID)
if err != nil {
return err
}
@@ -97,7 +88,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}
@@ -119,12 +110,6 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())
// Build the watch
items := make([]watch.Item, 0, len(args.AllocIDs))
for _, allocID := range args.AllocIDs {
items = append(items, watch.Item{Alloc: allocID})
}
allocs := make([]*structs.Allocation, len(args.AllocIDs))
// Setup the blocking query. We wait for at least one of the requested
@@ -133,18 +118,12 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(items...),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
thresholdMet := false
maxIndex := uint64(0)
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
out, err := state.AllocByID(ws, alloc)
if err != nil {
return err
}
@@ -173,7 +152,7 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
reply.Index = maxIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"math"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
@@ -67,7 +68,8 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
// Get all the jobs eligible for garbage collection.
iter, err := c.snap.JobsByGC(true)
ws := memdb.NewWatchSet()
iter, err := c.snap.JobsByGC(ws, true)
if err != nil {
return err
}
@@ -99,7 +101,8 @@ OUTER:
continue
}
evals, err := c.snap.EvalsByJob(job.ID)
ws := memdb.NewWatchSet()
evals, err := c.snap.EvalsByJob(ws, job.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
continue
@@ -163,7 +166,8 @@ OUTER:
// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
iter, err := c.snap.Evals()
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws)
if err != nil {
return err
}
@@ -227,6 +231,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return false, nil, nil
}
// Create a watchset
ws := memdb.NewWatchSet()
// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
@@ -237,7 +244,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}
// Check if the job is running
job, err := c.snap.JobByID(eval.JobID)
job, err := c.snap.JobByID(ws, eval.JobID)
if err != nil {
return false, nil, err
}
@@ -249,7 +256,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
allocs, err := c.snap.AllocsByEval(ws, eval.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
@@ -336,7 +343,8 @@ func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDel
// nodeGC is used to garbage collect old nodes
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
iter, err := c.snap.Nodes()
ws := memdb.NewWatchSet()
iter, err := c.snap.Nodes(ws)
if err != nil {
return err
}
@@ -374,7 +382,8 @@ OUTER:
}
// Get the allocations by node
allocs, err := c.snap.AllocsByNode(node.ID)
ws := memdb.NewWatchSet()
allocs, err := c.snap.AllocsByNode(ws, node.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for node %s: %v",
eval.ID, err)

View File

@@ -6,8 +6,8 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)
@@ -33,14 +33,9 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Eval: args.EvalID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the job
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.EvalByID(args.EvalID)
out, err := state.EvalByID(ws, args.EvalID)
if err != nil {
return err
}
@@ -51,7 +46,7 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("evals")
index, err := state.Index("evals")
if err != nil {
return err
}
@@ -190,7 +185,9 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest,
if err != nil {
return err
}
out, err := snap.EvalByID(eval.ID)
ws := memdb.NewWatchSet()
out, err := snap.EvalByID(ws, eval.ID)
if err != nil {
return err
}
@@ -233,7 +230,9 @@ func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericRe
if err != nil {
return err
}
out, err := snap.EvalByID(eval.ID)
ws := memdb.NewWatchSet()
out, err := snap.EvalByID(ws, eval.ID)
if err != nil {
return err
}
@@ -280,18 +279,14 @@ func (e *Eval) List(args *structs.EvalListRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "evals"}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Scan all the evaluations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = snap.EvalsByIDPrefix(prefix)
iter, err = state.EvalsByIDPrefix(ws, prefix)
} else {
iter, err = snap.Evals()
iter, err = state.Evals(ws)
}
if err != nil {
return err
@@ -309,7 +304,7 @@ func (e *Eval) List(args *structs.EvalListRequest,
reply.Evaluations = evals
// Use the last index that affected the jobs table
index, err := snap.Index("evals")
index, err := state.Index("evals")
if err != nil {
return err
}
@@ -334,14 +329,9 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocEval: args.EvalID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the allocations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByEval(args.EvalID)
allocs, err := state.AllocsByEval(ws, args.EvalID)
if err != nil {
return err
}
@@ -355,7 +345,7 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
}
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
@@ -212,7 +213,8 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
// Unblock evals for the nodes computed node class if it is in a ready
// state.
if req.Status == structs.NodeStatusReady {
node, err := n.state.NodeByID(req.NodeID)
ws := memdb.NewWatchSet()
node, err := n.state.NodeByID(ws, req.NodeID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", req.NodeID, err)
return err
@@ -265,13 +267,16 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return err
}
// Create a watch set
ws := memdb.NewWatchSet()
// If it is periodic, record the time it was inserted. This is necessary for
// recovering during leader election. It is possible that from the time it
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if req.Job.IsPeriodic() {
prevLaunch, err := n.state.PeriodicLaunchByID(req.Job.ID)
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
return err
@@ -291,7 +296,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// Check if the parent job is periodic and mark the launch time.
parentID := req.Job.ParentID
if parentID != "" {
parent, err := n.state.JobByID(parentID)
parent, err := n.state.JobByID(ws, parentID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID(%v) lookup for parent failed: %v", parentID, err)
return err
@@ -444,9 +449,12 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
// Create a watch set
ws := memdb.NewWatchSet()
// Updating the allocs with the job id and task group name
for _, alloc := range req.Alloc {
if existing, _ := n.state.AllocByID(alloc.ID); existing != nil {
if existing, _ := n.state.AllocByID(ws, alloc.ID); existing != nil {
alloc.JobID = existing.JobID
alloc.TaskGroup = existing.TaskGroup
}
@@ -464,7 +472,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
if alloc.ClientStatus == structs.AllocClientStatusComplete ||
alloc.ClientStatus == structs.AllocClientStatusFailed {
nodeID := alloc.NodeID
node, err := n.state.NodeByID(nodeID)
node, err := n.state.NodeByID(ws, nodeID)
if err != nil || node == nil {
n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", nodeID, err)
return err
@@ -706,7 +714,8 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Get all the jobs
iter, err := n.state.Jobs()
ws := memdb.NewWatchSet()
iter, err := n.state.Jobs(ws)
if err != nil {
return err
}
@@ -750,7 +759,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
}
// Get the job summary from the fsm state store
originalSummary, err := n.state.JobSummaryByID(job.ID)
originalSummary, err := n.state.JobSummaryByID(ws, job.ID)
if err != nil {
return err
}
@@ -886,7 +895,8 @@ func (s *nomadSnapshot) persistIndexes(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
nodes, err := s.snap.Nodes()
ws := memdb.NewWatchSet()
nodes, err := s.snap.Nodes(ws)
if err != nil {
return err
}
@@ -913,7 +923,8 @@ func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
jobs, err := s.snap.Jobs()
ws := memdb.NewWatchSet()
jobs, err := s.snap.Jobs(ws)
if err != nil {
return err
}
@@ -940,7 +951,8 @@ func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the evaluations
evals, err := s.snap.Evals()
ws := memdb.NewWatchSet()
evals, err := s.snap.Evals(ws)
if err != nil {
return err
}
@@ -967,7 +979,8 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the allocations
allocs, err := s.snap.Allocs()
ws := memdb.NewWatchSet()
allocs, err := s.snap.Allocs(ws)
if err != nil {
return err
}
@@ -994,7 +1007,8 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
launches, err := s.snap.PeriodicLaunches()
ws := memdb.NewWatchSet()
launches, err := s.snap.PeriodicLaunches(ws)
if err != nil {
return err
}
@@ -1021,7 +1035,8 @@ func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
summaries, err := s.snap.JobSummaries()
ws := memdb.NewWatchSet()
summaries, err := s.snap.JobSummaries(ws)
if err != nil {
return err
}
@@ -1045,7 +1060,8 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistVaultAccessors(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
accessors, err := s.snap.VaultAccessors()
ws := memdb.NewWatchSet()
accessors, err := s.snap.VaultAccessors(ws)
if err != nil {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -19,7 +20,8 @@ func (s *Server) initializeHeartbeatTimers() error {
}
// Get an iterator over nodes
iter, err := snap.Nodes()
ws := memdb.NewWatchSet()
iter, err := snap.Nodes(ws)
if err != nil {
return err
}

View File

@@ -13,8 +13,8 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)
@@ -72,7 +72,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if err != nil {
return err
}
job, err := snap.JobByID(args.Job.ID)
ws := memdb.NewWatchSet()
job, err := snap.JobByID(ws, args.Job.ID)
if err != nil {
return err
}
@@ -257,15 +258,9 @@ func (j *Job) Summary(args *structs.JobSummaryRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{JobSummary: args.JobID}),
run: func() error {
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for job summary
out, err := snap.JobSummaryByID(args.JobID)
out, err := state.JobSummaryByID(ws, args.JobID)
if err != nil {
return err
}
@@ -276,7 +271,7 @@ func (j *Job) Summary(args *structs.JobSummaryRequest,
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the job_summary table
index, err := snap.Index("job_summary")
index, err := state.Index("job_summary")
if err != nil {
return err
}
@@ -307,7 +302,8 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
ws := memdb.NewWatchSet()
job, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}
@@ -368,7 +364,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
ws := memdb.NewWatchSet()
job, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}
@@ -432,15 +429,9 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Job: args.JobID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobByID(args.JobID)
out, err := state.JobByID(ws, args.JobID)
if err != nil {
return err
}
@@ -451,7 +442,7 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("jobs")
index, err := state.Index("jobs")
if err != nil {
return err
}
@@ -477,18 +468,14 @@ func (j *Job) List(args *structs.JobListRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "jobs"}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the jobs
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = snap.JobsByIDPrefix(prefix)
iter, err = state.JobsByIDPrefix(ws, prefix)
} else {
iter, err = snap.Jobs()
iter, err = state.Jobs(ws)
}
if err != nil {
return err
@@ -501,7 +488,7 @@ func (j *Job) List(args *structs.JobListRequest,
break
}
job := raw.(*structs.Job)
summary, err := snap.JobSummaryByID(job.ID)
summary, err := state.JobSummaryByID(ws, job.ID)
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
@@ -510,7 +497,7 @@ func (j *Job) List(args *structs.JobListRequest,
reply.Jobs = jobs
// Use the last index that affected the jobs table
index, err := snap.Index("jobs")
index, err := state.Index("jobs")
if err != nil {
return err
}
@@ -535,14 +522,9 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocJob: args.JobID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the allocations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByJob(args.JobID, args.AllAllocs)
allocs, err := state.AllocsByJob(ws, args.JobID, args.AllAllocs)
if err != nil {
return err
}
@@ -556,7 +538,7 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest,
}
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}
@@ -582,21 +564,16 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{EvalJob: args.JobID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the evals
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
var err error
reply.Evaluations, err = state.EvalsByJob(ws, args.JobID)
if err != nil {
return err
}
// Use the last index that affected the evals table
index, err := snap.Index("evals")
index, err := state.Index("evals")
if err != nil {
return err
}
@@ -641,7 +618,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}
// Get the original job
oldJob, err := snap.JobByID(args.Job.ID)
ws := memdb.NewWatchSet()
oldJob, err := snap.JobByID(ws, args.Job.ID)
if err != nil {
return err
}
@@ -797,7 +775,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
if err != nil {
return err
}
parameterizedJob, err := snap.JobByID(args.JobID)
ws := memdb.NewWatchSet()
parameterizedJob, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@@ -191,7 +192,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// a leadership transition takes place.
func (s *Server) restoreEvals() error {
// Get an iterator over every evaluation
iter, err := s.fsm.State().Evals()
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().Evals(ws)
if err != nil {
return fmt.Errorf("failed to get evaluations: %v", err)
}
@@ -216,8 +218,9 @@ func (s *Server) restoreEvals() error {
// revoked.
func (s *Server) restoreRevokingAccessors() error {
// An accessor should be revoked if its allocation or node is terminal
ws := memdb.NewWatchSet()
state := s.fsm.State()
iter, err := state.VaultAccessors()
iter, err := state.VaultAccessors(ws)
if err != nil {
return fmt.Errorf("failed to get vault accessors: %v", err)
}
@@ -232,7 +235,7 @@ func (s *Server) restoreRevokingAccessors() error {
va := raw.(*structs.VaultAccessor)
// Check the allocation
alloc, err := state.AllocByID(va.AllocID)
alloc, err := state.AllocByID(ws, va.AllocID)
if err != nil {
return fmt.Errorf("failed to lookup allocation: %v", va.AllocID, err)
}
@@ -243,7 +246,7 @@ func (s *Server) restoreRevokingAccessors() error {
}
// Check the node
node, err := state.NodeByID(va.NodeID)
node, err := state.NodeByID(ws, va.NodeID)
if err != nil {
return fmt.Errorf("failed to lookup node %q: %v", va.NodeID, err)
}
@@ -269,7 +272,8 @@ func (s *Server) restoreRevokingAccessors() error {
// dispatcher is maintained only by the leader, so it must be restored anytime a
// leadership transition takes place.
func (s *Server) restorePeriodicDispatcher() error {
iter, err := s.fsm.State().JobsByPeriodic(true)
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().JobsByPeriodic(ws, true)
if err != nil {
return fmt.Errorf("failed to get periodic jobs: %v", err)
}
@@ -282,7 +286,7 @@ func (s *Server) restorePeriodicDispatcher() error {
// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
launch, err := s.fsm.State().PeriodicLaunchByID(job.ID)
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.ID)
if err != nil || launch == nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/raft"
vapi "github.com/hashicorp/vault/api"
)
@@ -103,7 +102,9 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
if err != nil {
return err
}
originalNode, err := snap.NodeByID(args.Node.ID)
ws := memdb.NewWatchSet()
originalNode, err := snap.NodeByID(ws, args.Node.ID)
if err != nil {
return err
}
@@ -203,7 +204,8 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
// Snapshot is used only to iterate over all nodes to create a node
// count to send back to Nomad Clients in their heartbeat so Clients
// can estimate the size of the cluster.
iter, err := snap.Nodes()
ws := memdb.NewWatchSet()
iter, err := snap.Nodes(ws)
if err == nil {
for {
raw := iter.Next()
@@ -248,7 +250,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}
// Determine if there are any Vault accessors on the node
accessors, err := n.srv.State().VaultAccessorsByNode(args.NodeID)
ws := memdb.NewWatchSet()
accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err)
return err
@@ -289,7 +292,9 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
if err != nil {
return err
}
node, err := snap.NodeByID(args.NodeID)
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
@@ -330,7 +335,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
switch args.Status {
case structs.NodeStatusDown:
// Determine if there are any Vault accessors on the node
accessors, err := n.srv.State().VaultAccessorsByNode(args.NodeID)
accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err)
return err
@@ -389,7 +394,8 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
if err != nil {
return err
}
node, err := snap.NodeByID(args.NodeID)
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
@@ -443,7 +449,8 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
if err != nil {
return err
}
node, err := snap.NodeByID(args.NodeID)
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
@@ -484,19 +491,14 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Node: args.NodeID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID")
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.NodeByID(args.NodeID)
out, err := state.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
@@ -509,7 +511,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("nodes")
index, err := state.Index("nodes")
if err != nil {
return err
}
@@ -541,14 +543,9 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByNode(args.NodeID)
allocs, err := state.AllocsByNode(ws, args.NodeID)
if err != nil {
return err
}
@@ -563,7 +560,7 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
reply.Allocs = nil
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}
@@ -599,16 +596,9 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
// Look for the node
node, err := snap.NodeByID(args.NodeID)
node, err := state.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
@@ -628,7 +618,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
}
var err error
allocs, err = snap.AllocsByNode(args.NodeID)
allocs, err = state.AllocsByNode(ws, args.NodeID)
if err != nil {
return err
}
@@ -643,7 +633,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
}
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
index, err := state.Index("allocs")
if err != nil {
return err
}
@@ -734,7 +724,8 @@ func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) {
}
// Determine if there are any Vault accessors for the allocation
accessors, err := n.srv.State().VaultAccessorsByAlloc(alloc.ID)
ws := memdb.NewWatchSet()
accessors, err := n.srv.State().VaultAccessorsByAlloc(ws, alloc.ID)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for alloc %q failed: %v", alloc.ID, err)
mErr.Errors = append(mErr.Errors, err)
@@ -766,18 +757,14 @@ func (n *Node) List(args *structs.NodeListRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "nodes"}),
run: func() error {
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the nodes
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = snap.NodesByIDPrefix(prefix)
iter, err = state.NodesByIDPrefix(ws, prefix)
} else {
iter, err = snap.Nodes()
iter, err = state.Nodes(ws)
}
if err != nil {
return err
@@ -795,7 +782,7 @@ func (n *Node) List(args *structs.NodeListRequest,
reply.Nodes = nodes
// Use the last index that affected the jobs table
index, err := snap.Index("nodes")
index, err := state.Index("nodes")
if err != nil {
return err
}
@@ -818,12 +805,13 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
}
// Find all the allocations for this node
allocs, err := snap.AllocsByNode(nodeID)
ws := memdb.NewWatchSet()
allocs, err := snap.AllocsByNode(ws, nodeID)
if err != nil {
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
}
sysJobsIter, err := snap.JobsByScheduler("system")
sysJobsIter, err := snap.JobsByScheduler(ws, "system")
if err != nil {
return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
}
@@ -985,7 +973,8 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
setErr(err, false)
return nil
}
node, err := snap.NodeByID(args.NodeID)
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
setErr(err, false)
return nil
@@ -999,7 +988,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
return nil
}
alloc, err := snap.AllocByID(args.AllocID)
alloc, err := snap.AllocByID(ws, args.AllocID)
if err != nil {
setErr(err, false)
return nil

View File

@@ -9,6 +9,7 @@ import (
"sync"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -86,8 +87,9 @@ func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
return false, err
}
ws := memdb.NewWatchSet()
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
iter, err := state.JobsByIDPrefix(prefix)
iter, err := state.JobsByIDPrefix(ws, prefix)
if err != nil {
return false, err
}
@@ -102,7 +104,7 @@ func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
}
// Get the childs evaluations.
evals, err := state.EvalsByJob(child.ID)
evals, err := state.EvalsByJob(ws, child.ID)
if err != nil {
return false, err
}
@@ -113,7 +115,7 @@ func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
return true, nil
}
allocs, err := state.AllocsByEval(eval.ID)
allocs, err := state.AllocsByEval(ws, eval.ID)
if err != nil {
return false, err
}

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -30,7 +31,9 @@ func (p *Periodic) Force(args *structs.PeriodicForceRequest, reply *structs.Peri
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
ws := memdb.NewWatchSet()
job, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -322,7 +323,8 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
}
// Get the node itself
node, err := snap.NodeByID(nodeID)
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, nodeID)
if err != nil {
return false, fmt.Errorf("failed to get node '%s': %v", nodeID, err)
}
@@ -335,7 +337,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
}
// Get the existing allocations that are non-terminal
existingAlloc, err := snap.AllocsByNodeTerminal(nodeID, false)
existingAlloc, err := snap.AllocsByNodeTerminal(ws, nodeID, false)
if err != nil {
return false, fmt.Errorf("failed to get existing allocations for '%s': %v", nodeID, err)
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
)
@@ -333,7 +332,6 @@ type queryFn func(memdb.WatchSet, *state.StateStore) error
type blockingOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
watch watch.Items
run queryFn
}
@@ -376,7 +374,7 @@ RUN_QUERY:
// We can skip all watch tracking if this isn't a blocking query.
var ws memdb.WatchSet
if queryOpts.MinQueryIndex > 0 {
if opts.queryOpts.MinQueryIndex > 0 {
ws = memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
@@ -385,12 +383,11 @@ RUN_QUERY:
}
// Block up to the timeout if we didn't see anything fresh.
err := fn(ws, state)
err := opts.run(ws, state)
// Check for minimum query time
if err == nil && opts.queryOpts.MinQueryIndex > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
// XXX James can do this behavior too
goto RUN_QUERY
}
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)
@@ -446,7 +447,8 @@ func (w *Worker) ReblockEval(eval *structs.Evaluation) error {
// Update the evaluation if the queued jobs is not same as what is
// recorded in the job summary
summary, err := w.srv.fsm.state.JobSummaryByID(eval.JobID)
ws := memdb.NewWatchSet()
summary, err := w.srv.fsm.state.JobSummaryByID(ws, eval.JobID)
if err != nil {
return fmt.Errorf("couldn't retreive job summary: %v", err)
}

View File

@@ -4,6 +4,7 @@ import (
"log"
"regexp"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -107,7 +108,8 @@ func (e *EvalContext) Reset() {
func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, error) {
// Get the existing allocations that are non-terminal
existingAlloc, err := e.state.AllocsByNodeTerminal(nodeID, false)
ws := memdb.NewWatchSet()
existingAlloc, err := e.state.AllocsByNodeTerminal(ws, nodeID, false)
if err != nil {
return nil, err
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"log"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -183,7 +184,8 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error {
func (s *GenericScheduler) process() (bool, error) {
// Lookup the Job by ID
var err error
s.job, err = s.state.JobByID(s.eval.JobID)
ws := memdb.NewWatchSet()
s.job, err = s.state.JobByID(ws, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
@@ -354,7 +356,8 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Lookup the allocations by JobID
allocs, err := s.state.AllocsByJob(s.eval.JobID, true)
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
@@ -513,7 +516,8 @@ func (s *GenericScheduler) findPreferredNode(allocTuple *allocTuple) (node *stru
}
if taskGroup.EphemeralDisk.Sticky == true {
var preferredNode *structs.Node
preferredNode, err = s.state.NodeByID(allocTuple.Alloc.NodeID)
ws := memdb.NewWatchSet()
preferredNode, err = s.state.NodeByID(ws, allocTuple.Alloc.NodeID)
if preferredNode.Ready() {
node = preferredNode
}

View File

@@ -63,22 +63,22 @@ type Scheduler interface {
type State interface {
// Nodes returns an iterator over all the nodes.
// The type of each result is *structs.Node
Nodes() (memdb.ResultIterator, error)
Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error)
// AllocsByJob returns the allocations by JobID
AllocsByJob(jobID string, all bool) ([]*structs.Allocation, error)
AllocsByJob(ws memdb.WatchSet, jobID string, all bool) ([]*structs.Allocation, error)
// AllocsByNode returns all the allocations by node
AllocsByNode(node string) ([]*structs.Allocation, error)
AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error)
// AllocsByNodeTerminal returns all the allocations by node filtering by terminal status
AllocsByNodeTerminal(node string, terminal bool) ([]*structs.Allocation, error)
AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error)
// GetNodeByID is used to lookup a node by ID
NodeByID(nodeID string) (*structs.Node, error)
NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error)
// GetJobByID is used to lookup a job by ID
JobByID(id string) (*structs.Job, error)
JobByID(ws memdb.WatchSet, id string) (*structs.Job, error)
}
// Planner interface is used to submit a task allocation plan.

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"log"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -87,7 +88,8 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
func (s *SystemScheduler) process() (bool, error) {
// Lookup the Job by ID
var err error
s.job, err = s.state.JobByID(s.eval.JobID)
ws := memdb.NewWatchSet()
s.job, err = s.state.JobByID(ws, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
@@ -178,7 +180,8 @@ func (s *SystemScheduler) process() (bool, error) {
// existing allocations and node status to update the allocations.
func (s *SystemScheduler) computeJobAllocs() error {
// Lookup the allocations by JobID
allocs, err := s.state.AllocsByJob(s.eval.JobID, true)
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)

View File

@@ -7,6 +7,7 @@ import (
"sync"
"testing"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -159,7 +160,8 @@ func (h *Harness) ReblockEval(eval *structs.Evaluation) error {
defer h.planLock.Unlock()
// Check that the evaluation was already blocked.
old, err := h.State.EvalByID(eval.ID)
ws := memdb.NewWatchSet()
old, err := h.State.EvalByID(ws, eval.ID)
if err != nil {
return err
}

View File

@@ -6,6 +6,7 @@ import (
"math/rand"
"reflect"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -228,8 +229,9 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
}
// Scan the nodes
ws := memdb.NewWatchSet()
var out []*structs.Node
iter, err := state.Nodes()
iter, err := state.Nodes(ws)
if err != nil {
return nil, nil, err
}
@@ -301,7 +303,8 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
continue
}
node, err := state.NodeByID(alloc.NodeID)
ws := memdb.NewWatchSet()
node, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return nil, err
}
@@ -452,6 +455,7 @@ func setStatus(logger *log.Logger, planner Planner,
func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
stack Stack, updates []allocTuple) (destructive, inplace []allocTuple) {
ws := memdb.NewWatchSet()
n := len(updates)
inplaceCount := 0
for i := 0; i < n; i++ {
@@ -471,7 +475,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
}
// Get the existing node
node, err := ctx.State().NodeByID(update.Alloc.NodeID)
node, err := ctx.State().NodeByID(ws, update.Alloc.NodeID)
if err != nil {
ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v",
eval, update.Alloc.NodeID, err)