From 4cbc72678e8df6de38d5fb905efba68db1b302f0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 5 Feb 2017 12:45:57 -0800 Subject: [PATCH] Use watchset on getter methods --- nomad/state/state_store.go | 361 ++++++++++++------------------------- 1 file changed, 120 insertions(+), 241 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6ce88c659..780640e59 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4,11 +4,9 @@ import ( "fmt" "io" "log" - "sync" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" ) // IndexEntry is used with the "index" table @@ -28,7 +26,6 @@ type IndexEntry struct { type StateStore struct { logger *log.Logger db *memdb.MemDB - watch *stateWatch // abandonCh is used to signal watchers that this state store has been // abandoned (usually during a restore). This is only ever closed. @@ -47,7 +44,6 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { s := &StateStore{ logger: log.New(logOutput, "", log.LstdFlags), db: db, - watch: newStateWatch(), abandonCh: make(chan struct{}), } return s, nil @@ -61,7 +57,6 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { StateStore: StateStore{ logger: s.logger, db: s.db.Snapshot(), - watch: s.watch, }, } return snap, nil @@ -73,18 +68,11 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { func (s *StateStore) Restore() (*StateRestore, error) { txn := s.db.Txn(true) r := &StateRestore{ - txn: txn, - watch: s.watch, - items: watch.NewItems(), + txn: txn, } return r, nil } -// Watch subscribes a channel to a set of watch items. -func (s *StateStore) Watch(items watch.Items, notify chan struct{}) { - s.watch.watch(items, notify) -} - // AbandonCh returns a channel you can wait on to know if the state store was // abandoned. func (s *StateStore) AbandonCh() <-chan struct{} { @@ -140,10 +128,6 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "nodes"}) - watcher.Add(watch.Item{Node: node.ID}) - // Check if the node already exists existing, err := txn.First("nodes", "id", node.ID) if err != nil { @@ -169,7 +153,6 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } @@ -188,10 +171,6 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error { return fmt.Errorf("node not found") } - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "nodes"}) - watcher.Add(watch.Item{Node: nodeID}) - // Delete the node if err := txn.Delete("nodes", existing); err != nil { return fmt.Errorf("node delete failed: %v", err) @@ -200,7 +179,6 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } @@ -210,10 +188,6 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "nodes"}) - watcher.Add(watch.Item{Node: nodeID}) - // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -240,7 +214,6 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } @@ -250,10 +223,6 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "nodes"}) - watcher.Add(watch.Item{Node: nodeID}) - // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -280,19 +249,19 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // NodeByID is used to lookup a node by ID -func (s *StateStore) NodeByID(nodeID string) (*structs.Node, error) { +func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) - existing, err := txn.First("nodes", "id", nodeID) + watchCh, existing, err := txn.FirstWatch("nodes", "id", nodeID) if err != nil { return nil, fmt.Errorf("node lookup failed: %v", err) } + ws.Add(watchCh) if existing != nil { return existing.(*structs.Node), nil @@ -301,19 +270,20 @@ func (s *StateStore) NodeByID(nodeID string) (*structs.Node, error) { } // NodesByIDPrefix is used to lookup nodes by prefix -func (s *StateStore) NodesByIDPrefix(nodeID string) (memdb.ResultIterator, error) { +func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("nodes", "id_prefix", nodeID) if err != nil { return nil, fmt.Errorf("node lookup failed: %v", err) } + ws.Add(iter.WatchCh()) return iter, nil } // Nodes returns an iterator over all the nodes -func (s *StateStore) Nodes() (memdb.ResultIterator, error) { +func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire nodes table @@ -321,6 +291,7 @@ func (s *StateStore) Nodes() (memdb.ResultIterator, error) { if err != nil { return nil, err } + ws.Add(iter.WatchCh()) return iter, nil } @@ -329,10 +300,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "jobs"}) - watcher.Add(watch.Item{Job: job.ID}) - // Check if the job already exists existing, err := txn.First("jobs", "id", job.ID) if err != nil { @@ -356,7 +323,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { job.ModifyIndex = index job.JobModifyIndex = index - if err := s.setJobStatus(index, watcher, txn, job, false, ""); err != nil { + if err := s.setJobStatus(index, txn, job, false, ""); err != nil { return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) } @@ -370,7 +337,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } } - if err := s.updateSummaryWithJob(index, job, watcher, txn); err != nil { + if err := s.updateSummaryWithJob(index, job, txn); err != nil { return fmt.Errorf("unable to create job summary: %v", err) } @@ -386,7 +353,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } @@ -405,12 +371,6 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("job not found") } - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "jobs"}) - watcher.Add(watch.Item{Job: jobID}) - watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: jobID}) - // Check if we should update a parent job summary job := existing.(*structs.Job) if job.ParentID != "" { @@ -445,9 +405,6 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { // Update the modify index pSummary.ModifyIndex = index - watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: job.ParentID}) - // Insert the summary if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) @@ -476,19 +433,19 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // JobByID is used to lookup a job by its ID -func (s *StateStore) JobByID(id string) (*structs.Job, error) { +func (s *StateStore) JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) { txn := s.db.Txn(false) - existing, err := txn.First("jobs", "id", id) + watchCh, existing, err := txn.FirstWatch("jobs", "id", id) if err != nil { return nil, fmt.Errorf("job lookup failed: %v", err) } + ws.Add(watchCh) if existing != nil { return existing.(*structs.Job), nil @@ -497,7 +454,7 @@ func (s *StateStore) JobByID(id string) (*structs.Job, error) { } // JobsByIDPrefix is used to lookup a job by prefix -func (s *StateStore) JobsByIDPrefix(id string) (memdb.ResultIterator, error) { +func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("jobs", "id_prefix", id) @@ -505,11 +462,13 @@ func (s *StateStore) JobsByIDPrefix(id string) (memdb.ResultIterator, error) { return nil, fmt.Errorf("job lookup failed: %v", err) } + ws.Add(iter.WatchCh()) + return iter, nil } // Jobs returns an iterator over all the jobs -func (s *StateStore) Jobs() (memdb.ResultIterator, error) { +func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire jobs table @@ -517,23 +476,29 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) { if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } // JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs. -func (s *StateStore) JobsByPeriodic(periodic bool) (memdb.ResultIterator, error) { +func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("jobs", "periodic", periodic) if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } // JobsByScheduler returns an iterator over all the jobs with the specific // scheduler type. -func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) { +func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Return an iterator for jobs with the specific type. @@ -541,29 +506,38 @@ func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } // JobsByGC returns an iterator over all jobs eligible or uneligible for garbage // collection. -func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) { +func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("jobs", "gc", gc) if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } // JobSummary returns a job summary object which matches a specific id. -func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) { +func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, jobID string) (*structs.JobSummary, error) { txn := s.db.Txn(false) - existing, err := txn.First("job_summary", "id", jobID) + watchCh, existing, err := txn.FirstWatch("job_summary", "id", jobID) if err != nil { return nil, err } + + ws.Add(watchCh) + if existing != nil { summary := existing.(*structs.JobSummary) return summary, nil @@ -574,18 +548,21 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) { // JobSummaries walks the entire job summary table and returns all the job // summary objects -func (s *StateStore) JobSummaries() (memdb.ResultIterator, error) { +func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("job_summary", "id") if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } // JobSummaryByPrefix is used to look up Job Summary by id prefix -func (s *StateStore) JobSummaryByPrefix(id string) (memdb.ResultIterator, error) { +func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("job_summary", "id_prefix", id) @@ -593,6 +570,8 @@ func (s *StateStore) JobSummaryByPrefix(id string) (memdb.ResultIterator, error) return nil, fmt.Errorf("eval lookup failed: %v", err) } + ws.Add(iter.WatchCh()) + return iter, nil } @@ -601,10 +580,6 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "periodic_launch"}) - watcher.Add(watch.Item{Job: launch.ID}) - // Check if the job already exists existing, err := txn.First("periodic_launch", "id", launch.ID) if err != nil { @@ -628,7 +603,6 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } @@ -647,10 +621,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error { return fmt.Errorf("launch not found") } - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "periodic_launch"}) - watcher.Add(watch.Item{Job: jobID}) - // Delete the launch if err := txn.Delete("periodic_launch", existing); err != nil { return fmt.Errorf("launch delete failed: %v", err) @@ -659,21 +629,22 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // PeriodicLaunchByID is used to lookup a periodic launch by the periodic job // ID. -func (s *StateStore) PeriodicLaunchByID(id string) (*structs.PeriodicLaunch, error) { +func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, id string) (*structs.PeriodicLaunch, error) { txn := s.db.Txn(false) - existing, err := txn.First("periodic_launch", "id", id) + watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", id) if err != nil { return nil, fmt.Errorf("periodic launch lookup failed: %v", err) } + ws.Add(watchCh) + if existing != nil { return existing.(*structs.PeriodicLaunch), nil } @@ -681,7 +652,7 @@ func (s *StateStore) PeriodicLaunchByID(id string) (*structs.PeriodicLaunch, err } // PeriodicLaunches returns an iterator over all the periodic launches -func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) { +func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire table @@ -689,6 +660,9 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) { if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } @@ -697,15 +671,10 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "evals"}) - // Do a nested upsert jobs := make(map[string]string, len(evals)) for _, eval := range evals { - watcher.Add(watch.Item{Eval: eval.ID}) - watcher.Add(watch.Item{EvalJob: eval.JobID}) - if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil { + if err := s.nestedUpsertEval(txn, index, eval); err != nil { return err } @@ -713,17 +682,16 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro } // Set the job's status - if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction -func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error { +func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { @@ -797,8 +765,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index if err := txn.Insert("evals", newEval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } - - watcher.Add(watch.Item{Eval: newEval.ID}) } } @@ -816,9 +782,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "evals"}) - watcher.Add(watch.Item{Table: "allocs"}) jobs := make(map[string]string, len(evals)) for _, eval := range evals { @@ -833,8 +796,6 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("eval delete failed: %v", err) } jobID := existing.(*structs.Evaluation).JobID - watcher.Add(watch.Item{Eval: eval}) - watcher.Add(watch.Item{EvalJob: jobID}) jobs[jobID] = "" } @@ -849,11 +810,6 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e if err := txn.Delete("allocs", existing); err != nil { return fmt.Errorf("alloc delete failed: %v", err) } - realAlloc := existing.(*structs.Allocation) - watcher.Add(watch.Item{Alloc: realAlloc.ID}) - watcher.Add(watch.Item{AllocEval: realAlloc.EvalID}) - watcher.Add(watch.Item{AllocJob: realAlloc.JobID}) - watcher.Add(watch.Item{AllocNode: realAlloc.NodeID}) } // Update the indexes @@ -865,24 +821,25 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e } // Set the job's status - if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil { + if err := s.setJobStatuses(index, txn, jobs, true); err != nil { return fmt.Errorf("setting job status failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // EvalByID is used to lookup an eval by its ID -func (s *StateStore) EvalByID(id string) (*structs.Evaluation, error) { +func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation, error) { txn := s.db.Txn(false) - existing, err := txn.First("evals", "id", id) + watchCh, existing, err := txn.FirstWatch("evals", "id", id) if err != nil { return nil, fmt.Errorf("eval lookup failed: %v", err) } + ws.Add(watchCh) + if existing != nil { return existing.(*structs.Evaluation), nil } @@ -890,7 +847,7 @@ func (s *StateStore) EvalByID(id string) (*structs.Evaluation, error) { } // EvalsByIDPrefix is used to lookup evaluations by prefix -func (s *StateStore) EvalsByIDPrefix(id string) (memdb.ResultIterator, error) { +func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("evals", "id_prefix", id) @@ -898,11 +855,13 @@ func (s *StateStore) EvalsByIDPrefix(id string) (memdb.ResultIterator, error) { return nil, fmt.Errorf("eval lookup failed: %v", err) } + ws.Add(iter.WatchCh()) + return iter, nil } // EvalsByJob returns all the evaluations by job id -func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) { +func (s *StateStore) EvalsByJob(ws memdb.WatchSet, jobID string) ([]*structs.Evaluation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations @@ -911,6 +870,8 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) { return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.Evaluation for { raw := iter.Next() @@ -923,7 +884,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) { } // Evals returns an iterator over all the evaluations -func (s *StateStore) Evals() (memdb.ResultIterator, error) { +func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire table @@ -931,6 +892,9 @@ func (s *StateStore) Evals() (memdb.ResultIterator, error) { if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } @@ -944,13 +908,9 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo txn := s.db.Txn(true) defer txn.Abort() - // Setup the watcher - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "allocs"}) - // Handle each of the updated allocations for _, alloc := range allocs { - if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { + if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil { return err } } @@ -960,13 +920,12 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // nestedUpdateAllocFromClient is used to nest an update of an allocation with client status -func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.Items, index uint64, alloc *structs.Allocation) error { +func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, alloc *structs.Allocation) error { // Look for existing alloc existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -979,12 +938,6 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I } exist := existing.(*structs.Allocation) - // Trigger the watcher - watcher.Add(watch.Item{Alloc: alloc.ID}) - watcher.Add(watch.Item{AllocEval: exist.EvalID}) - watcher.Add(watch.Item{AllocJob: exist.JobID}) - watcher.Add(watch.Item{AllocNode: exist.NodeID}) - // Copy everything from the existing allocation copyAlloc := new(structs.Allocation) *copyAlloc = *exist @@ -997,7 +950,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I // Update the modify index copyAlloc.ModifyIndex = index - if err := s.updateSummaryWithAlloc(index, copyAlloc, exist, watcher, txn); err != nil { + if err := s.updateSummaryWithAlloc(index, copyAlloc, exist, txn); err != nil { return fmt.Errorf("error updating job summary: %v", err) } @@ -1012,7 +965,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I forceStatus = structs.JobStatusRunning } jobs := map[string]string{exist.JobID: forceStatus} - if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } return nil @@ -1024,9 +977,6 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "allocs"}) - // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { @@ -1058,7 +1008,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } } - if err := s.updateSummaryWithAlloc(index, alloc, exist, watcher, txn); err != nil { + if err := s.updateSummaryWithAlloc(index, alloc, exist, txn); err != nil { return fmt.Errorf("error updating job summary: %v", err) } @@ -1078,11 +1028,6 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er forceStatus = structs.JobStatusRunning } jobs[alloc.JobID] = forceStatus - - watcher.Add(watch.Item{Alloc: alloc.ID}) - watcher.Add(watch.Item{AllocEval: alloc.EvalID}) - watcher.Add(watch.Item{AllocJob: alloc.JobID}) - watcher.Add(watch.Item{AllocNode: alloc.NodeID}) } // Update the indexes @@ -1091,24 +1036,25 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } // Set the job's status - if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil } // AllocByID is used to lookup an allocation by its ID -func (s *StateStore) AllocByID(id string) (*structs.Allocation, error) { +func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) { txn := s.db.Txn(false) - existing, err := txn.First("allocs", "id", id) + watchCh, existing, err := txn.FirstWatch("allocs", "id", id) if err != nil { return nil, fmt.Errorf("alloc lookup failed: %v", err) } + ws.Add(watchCh) + if existing != nil { return existing.(*structs.Allocation), nil } @@ -1116,7 +1062,7 @@ func (s *StateStore) AllocByID(id string) (*structs.Allocation, error) { } // AllocsByIDPrefix is used to lookup allocs by prefix -func (s *StateStore) AllocsByIDPrefix(id string) (memdb.ResultIterator, error) { +func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("allocs", "id_prefix", id) @@ -1124,11 +1070,13 @@ func (s *StateStore) AllocsByIDPrefix(id string) (memdb.ResultIterator, error) { return nil, fmt.Errorf("alloc lookup failed: %v", err) } + ws.Add(iter.WatchCh()) + return iter, nil } // AllocsByNode returns all the allocations by node -func (s *StateStore) AllocsByNode(node string) ([]*structs.Allocation, error) { +func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations, using only the @@ -1138,6 +1086,8 @@ func (s *StateStore) AllocsByNode(node string) ([]*structs.Allocation, error) { return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.Allocation for { raw := iter.Next() @@ -1150,7 +1100,7 @@ func (s *StateStore) AllocsByNode(node string) ([]*structs.Allocation, error) { } // AllocsByNode returns all the allocations by node and terminal status -func (s *StateStore) AllocsByNodeTerminal(node string, terminal bool) ([]*structs.Allocation, error) { +func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations @@ -1159,6 +1109,8 @@ func (s *StateStore) AllocsByNodeTerminal(node string, terminal bool) ([]*struct return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.Allocation for { raw := iter.Next() @@ -1171,7 +1123,7 @@ func (s *StateStore) AllocsByNodeTerminal(node string, terminal bool) ([]*struct } // AllocsByJob returns all the allocations by job id -func (s *StateStore) AllocsByJob(jobID string, all bool) ([]*structs.Allocation, error) { +func (s *StateStore) AllocsByJob(ws memdb.WatchSet, jobID string, all bool) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get the job @@ -1190,6 +1142,8 @@ func (s *StateStore) AllocsByJob(jobID string, all bool) ([]*structs.Allocation, return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.Allocation for { raw := iter.Next() @@ -1210,7 +1164,7 @@ func (s *StateStore) AllocsByJob(jobID string, all bool) ([]*structs.Allocation, } // AllocsByEval returns all the allocations by eval id -func (s *StateStore) AllocsByEval(evalID string) ([]*structs.Allocation, error) { +func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get an iterator over the eval allocations @@ -1219,6 +1173,8 @@ func (s *StateStore) AllocsByEval(evalID string) ([]*structs.Allocation, error) return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.Allocation for { raw := iter.Next() @@ -1231,7 +1187,7 @@ func (s *StateStore) AllocsByEval(evalID string) ([]*structs.Allocation, error) } // Allocs returns an iterator over all the evaluations -func (s *StateStore) Allocs() (memdb.ResultIterator, error) { +func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire table @@ -1239,6 +1195,9 @@ func (s *StateStore) Allocs() (memdb.ResultIterator, error) { if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } @@ -1287,14 +1246,16 @@ func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.Vau } // VaultAccessor returns the given Vault accessor -func (s *StateStore) VaultAccessor(accessor string) (*structs.VaultAccessor, error) { +func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs.VaultAccessor, error) { txn := s.db.Txn(false) - existing, err := txn.First("vault_accessors", "id", accessor) + watchCh, existing, err := txn.FirstWatch("vault_accessors", "id", accessor) if err != nil { return nil, fmt.Errorf("accessor lookup failed: %v", err) } + ws.Add(watchCh) + if existing != nil { return existing.(*structs.VaultAccessor), nil } @@ -1303,18 +1264,21 @@ func (s *StateStore) VaultAccessor(accessor string) (*structs.VaultAccessor, err } // VaultAccessors returns an iterator of Vault accessors. -func (s *StateStore) VaultAccessors() (memdb.ResultIterator, error) { +func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) iter, err := txn.Get("vault_accessors", "id") if err != nil { return nil, err } + + ws.Add(iter.WatchCh()) + return iter, nil } // VaultAccessorsByAlloc returns all the Vault accessors by alloc id -func (s *StateStore) VaultAccessorsByAlloc(allocID string) ([]*structs.VaultAccessor, error) { +func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.VaultAccessor, error) { txn := s.db.Txn(false) // Get an iterator over the accessors @@ -1323,6 +1287,8 @@ func (s *StateStore) VaultAccessorsByAlloc(allocID string) ([]*structs.VaultAcce return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.VaultAccessor for { raw := iter.Next() @@ -1335,7 +1301,7 @@ func (s *StateStore) VaultAccessorsByAlloc(allocID string) ([]*structs.VaultAcce } // VaultAccessorsByNode returns all the Vault accessors by node id -func (s *StateStore) VaultAccessorsByNode(nodeID string) ([]*structs.VaultAccessor, error) { +func (s *StateStore) VaultAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.VaultAccessor, error) { txn := s.db.Txn(false) // Get an iterator over the accessors @@ -1344,6 +1310,8 @@ func (s *StateStore) VaultAccessorsByNode(nodeID string) ([]*structs.VaultAccess return nil, err } + ws.Add(iter.WatchCh()) + var out []*structs.VaultAccessor for { raw := iter.Next() @@ -1508,7 +1476,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { // setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. // It takes a map of job IDs to an optional forceStatus string. It returns an // error if the job doesn't exist or setJobStatus fails. -func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn, +func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, jobs map[string]string, evalDelete bool) error { for job, forceStatus := range jobs { existing, err := txn.First("jobs", "id", job) @@ -1520,7 +1488,7 @@ func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memd continue } - if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { + if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { return err } } @@ -1533,7 +1501,7 @@ func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memd // called because an evaluation is being deleted (potentially because of garbage // collection). If forceStatus is non-empty, the job's status will be set to the // passed status. -func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn, +func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, job *structs.Job, evalDelete bool, forceStatus string) error { // Capture the current status so we can check if there is a change @@ -1557,10 +1525,6 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb. return nil } - // The job has changed, so add to watcher. - watcher.Add(watch.Item{Table: "jobs"}) - watcher.Add(watch.Item{Job: job.ID}) - // Copy and update the existing job updated := job.Copy() updated.Status = newStatus @@ -1623,9 +1587,6 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb. // Update the index pSummary.ModifyIndex = index - watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: updated.ParentID}) - // Insert the summary if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) @@ -1685,7 +1646,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, - watcher watch.Items, txn *memdb.Txn) error { + txn *memdb.Txn) error { // Update the job summary summaryRaw, err := txn.First("job_summary", "id", job.ID) @@ -1721,12 +1682,9 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, } } - // The job summary has changed, so add to watcher and update the modify - // index. + // The job summary has changed, so update the modify index. if hasSummaryChanged { summary.ModifyIndex = index - watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: job.ID}) // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { @@ -1743,7 +1701,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, - existingAlloc *structs.Allocation, watcher watch.Items, txn *memdb.Txn) error { + existingAlloc *structs.Allocation, txn *memdb.Txn) error { // We don't have to update the summary if the job is missing if alloc.Job == nil { @@ -1837,8 +1795,6 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if summaryChanged { jobSummary.ModifyIndex = index - watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: alloc.JobID}) // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { @@ -1881,9 +1837,7 @@ type StateSnapshot struct { // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { - txn *memdb.Txn - watch *stateWatch - items watch.Items + txn *memdb.Txn } // Abort is used to abort the restore operation @@ -1893,14 +1847,11 @@ func (s *StateRestore) Abort() { // Commit is used to commit the restore operation func (s *StateRestore) Commit() { - s.txn.Defer(func() { s.watch.notify(s.items) }) s.txn.Commit() } // NodeRestore is used to restore a node func (r *StateRestore) NodeRestore(node *structs.Node) error { - r.items.Add(watch.Item{Table: "nodes"}) - r.items.Add(watch.Item{Node: node.ID}) if err := r.txn.Insert("nodes", node); err != nil { return fmt.Errorf("node insert failed: %v", err) } @@ -1909,9 +1860,6 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error { // JobRestore is used to restore a job func (r *StateRestore) JobRestore(job *structs.Job) error { - r.items.Add(watch.Item{Table: "jobs"}) - r.items.Add(watch.Item{Job: job.ID}) - // Create the EphemeralDisk if it's nil by adding up DiskMB from task resources. // COMPAT 0.4.1 -> 0.5 r.addEphemeralDiskToTaskGroups(job) @@ -1924,9 +1872,6 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { // EvalRestore is used to restore an evaluation func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { - r.items.Add(watch.Item{Table: "evals"}) - r.items.Add(watch.Item{Eval: eval.ID}) - r.items.Add(watch.Item{EvalJob: eval.JobID}) if err := r.txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } @@ -1935,12 +1880,6 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { // AllocRestore is used to restore an allocation func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { - r.items.Add(watch.Item{Table: "allocs"}) - r.items.Add(watch.Item{Alloc: alloc.ID}) - r.items.Add(watch.Item{AllocEval: alloc.EvalID}) - r.items.Add(watch.Item{AllocJob: alloc.JobID}) - r.items.Add(watch.Item{AllocNode: alloc.NodeID}) - // Set the shared resources if it's not present // COMPAT 0.4.1 -> 0.5 if alloc.SharedResources == nil { @@ -1970,8 +1909,6 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error { // PeriodicLaunchRestore is used to restore a periodic launch. func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) error { - r.items.Add(watch.Item{Table: "periodic_launch"}) - r.items.Add(watch.Item{Job: launch.ID}) if err := r.txn.Insert("periodic_launch", launch); err != nil { return fmt.Errorf("periodic launch insert failed: %v", err) } @@ -1980,8 +1917,6 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err // JobSummaryRestore is used to restore a job summary func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { - r.items.Add(watch.Item{Table: "job_summary"}) - r.items.Add(watch.Item{JobSummary: jobSummary.JobID}) if err := r.txn.Insert("job_summary", jobSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } @@ -2014,59 +1949,3 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } } } - -// stateWatch holds shared state for watching updates. This is -// outside of StateStore so it can be shared with snapshots. -type stateWatch struct { - items map[watch.Item]*NotifyGroup - l sync.Mutex -} - -// newStateWatch creates a new stateWatch for change notification. -func newStateWatch() *stateWatch { - return &stateWatch{ - items: make(map[watch.Item]*NotifyGroup), - } -} - -// watch subscribes a channel to the given watch items. -func (w *stateWatch) watch(items watch.Items, ch chan struct{}) { - w.l.Lock() - defer w.l.Unlock() - - for item, _ := range items { - grp, ok := w.items[item] - if !ok { - grp = new(NotifyGroup) - w.items[item] = grp - } - grp.Wait(ch) - } -} - -// stopWatch unsubscribes a channel from the given watch items. -func (w *stateWatch) stopWatch(items watch.Items, ch chan struct{}) { - w.l.Lock() - defer w.l.Unlock() - - for item, _ := range items { - if grp, ok := w.items[item]; ok { - grp.Clear(ch) - if grp.Empty() { - delete(w.items, item) - } - } - } -} - -// notify is used to fire notifications on the given watch items. -func (w *stateWatch) notify(items watch.Items) { - w.l.Lock() - defer w.l.Unlock() - - for wi, _ := range items { - if grp, ok := w.items[wi]; ok { - grp.Notify() - } - } -}