diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index e8b6af63c..c07d5549d 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -86,8 +86,8 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, } // Setup the output + reply.Alloc = out if out != nil { - reply.Alloc = out reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 4147011ac..bcab0a387 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -74,7 +74,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 2 { @@ -101,7 +101,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } if resp2.Index != 3 { @@ -186,7 +186,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 200 { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 6f3d154e5..55782a031 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -92,7 +92,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 200 { @@ -117,7 +117,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } if resp2.Index != 300 { @@ -440,7 +440,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 2 { @@ -464,7 +464,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } if resp2.Index != 3 { @@ -551,7 +551,7 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 200 { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 9e09de538..c12e5b463 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -402,7 +402,7 @@ func TestJobEndpoint_GetJob_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 200 { @@ -427,7 +427,7 @@ func TestJobEndpoint_GetJob_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } if resp2.Index != 300 { @@ -501,7 +501,7 @@ func TestJobEndpoint_ListJobs_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 100 { @@ -525,7 +525,7 @@ func TestJobEndpoint_ListJobs_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } if resp2.Index != 200 { @@ -613,7 +613,7 @@ func TestJobEndpoint_Allocations_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 200 { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 9a74316c7..74b154655 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -410,7 +410,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 200 { @@ -437,7 +437,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp2.Index != 300 { @@ -461,7 +461,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp3.Index != 400 { @@ -910,7 +910,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } if resp.Index != 2 { @@ -934,7 +934,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } if resp2.Index != 3 { @@ -958,7 +958,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp3) } if resp3.Index != 4 { @@ -982,7 +982,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp4) } if resp4.Index != 5 { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 47ead285e..ec5aef29f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -455,8 +455,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e if err := txn.Delete("allocs", existing); err != nil { return fmt.Errorf("alloc delete failed: %v", err) } - watcher.Add(watch.Item{Alloc: alloc}) - watcher.Add(watch.Item{AllocNode: existing.(*structs.Allocation).NodeID}) + realAlloc := existing.(*structs.Allocation) + watcher.Add(watch.Item{Alloc: realAlloc.ID}) + watcher.Add(watch.Item{AllocEval: realAlloc.EvalID}) + watcher.Add(watch.Item{AllocJob: realAlloc.JobID}) + watcher.Add(watch.Item{AllocNode: realAlloc.NodeID}) } // Update the indexes @@ -795,6 +798,9 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { // AllocRestore is used to restore an allocation func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { r.items.Add(watch.Item{Table: "allocs"}) + r.items.Add(watch.Item{Alloc: alloc.ID}) + r.items.Add(watch.Item{AllocEval: alloc.EvalID}) + r.items.Add(watch.Item{AllocJob: alloc.JobID}) r.items.Add(watch.Item{AllocNode: alloc.NodeID}) if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 788b9f26a..2a5967450 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -26,11 +26,10 @@ func TestStateStore_UpsertNode_Node(t *testing.T) { state := testStateStore(t) node := mock.Node() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "nodes"}}, - {desc: "node", item: watch.Item{Node: node.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "nodes"}, + watch.Item{Node: node.ID}) err := state.UpsertNode(1000, node) if err != nil { @@ -61,11 +60,10 @@ func TestStateStore_DeleteNode_Node(t *testing.T) { state := testStateStore(t) node := mock.Node() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "nodes"}}, - {desc: "node", item: watch.Item{Node: node.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "nodes"}, + watch.Item{Node: node.ID}) err := state.UpsertNode(1000, node) if err != nil { @@ -101,11 +99,10 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { state := testStateStore(t) node := mock.Node() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "nodes"}}, - {desc: "node", item: watch.Item{Node: node.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "nodes"}, + watch.Item{Node: node.ID}) err := state.UpsertNode(1000, node) if err != nil { @@ -144,11 +141,10 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { state := testStateStore(t) node := mock.Node() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "nodes"}}, - {desc: "node", item: watch.Item{Node: node.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "nodes"}, + watch.Item{Node: node.ID}) err := state.UpsertNode(1000, node) if err != nil { @@ -223,11 +219,10 @@ func TestStateStore_RestoreNode(t *testing.T) { state := testStateStore(t) node := mock.Node() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "nodes"}}, - {desc: "node", item: watch.Item{Node: node.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "nodes"}, + watch.Item{Node: node.ID}) restore, err := state.Restore() if err != nil { @@ -256,11 +251,10 @@ func TestStateStore_UpsertJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "jobs"}}, - {desc: "job", item: watch.Item{Job: job.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) err := state.UpsertJob(1000, job) if err != nil { @@ -291,11 +285,10 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "jobs"}}, - {desc: "job", item: watch.Item{Job: job.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) err := state.UpsertJob(1000, job) if err != nil { @@ -340,11 +333,10 @@ func TestStateStore_DeleteJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "jobs"}}, - {desc: "job", item: watch.Item{Job: job.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) err := state.UpsertJob(1000, job) if err != nil { @@ -483,11 +475,10 @@ func TestStateStore_RestoreJob(t *testing.T) { state := testStateStore(t) job := mock.Job() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "jobs"}}, - {desc: "job", item: watch.Item{Job: job.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) restore, err := state.Restore() if err != nil { @@ -574,11 +565,10 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) { state := testStateStore(t) eval := mock.Eval() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "evals"}}, - {desc: "eval", item: watch.Item{Eval: eval.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: eval.ID}) err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) if err != nil { @@ -614,11 +604,10 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) { t.Fatalf("err: %v", err) } - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "evals"}}, - {desc: "eval", item: watch.Item{Eval: eval.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: eval.ID}) eval2 := mock.Eval() eval2.ID = eval.ID @@ -661,16 +650,19 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { alloc1 := mock.Alloc() alloc2 := mock.Alloc() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "evals"}}, - {desc: "eval1", item: watch.Item{Eval: eval1.ID}}, - {desc: "eval2", item: watch.Item{Eval: eval2.ID}}, - {desc: "alloc1", item: watch.Item{Alloc: alloc1.ID}}, - {desc: "alloc2", item: watch.Item{Alloc: alloc2.ID}}, - {desc: "allocnode1", item: watch.Item{AllocNode: alloc1.NodeID}}, - {desc: "allocnode2", item: watch.Item{AllocNode: alloc2.NodeID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: eval1.ID}, + watch.Item{Eval: eval2.ID}, + watch.Item{Alloc: alloc1.ID}, + watch.Item{Alloc: alloc2.ID}, + watch.Item{AllocEval: alloc1.EvalID}, + watch.Item{AllocEval: alloc2.EvalID}, + watch.Item{AllocJob: alloc1.JobID}, + watch.Item{AllocJob: alloc2.JobID}, + watch.Item{AllocNode: alloc1.NodeID}, + watch.Item{AllocNode: alloc2.NodeID}) err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}) if err != nil { @@ -813,11 +805,10 @@ func TestStateStore_RestoreEval(t *testing.T) { state := testStateStore(t) eval := mock.Eval() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "evals"}}, - {desc: "eval", item: watch.Item{Eval: eval.ID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: eval.ID}) restore, err := state.Restore() if err != nil { @@ -846,14 +837,13 @@ func TestStateStore_UpdateAllocFromClient(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "allocs"}}, - {desc: "alloc", item: watch.Item{Alloc: alloc.ID}}, - {desc: "alloceval", item: watch.Item{AllocEval: alloc.EvalID}}, - {desc: "allocjob", item: watch.Item{AllocJob: alloc.JobID}}, - {desc: "allocnode", item: watch.Item{AllocNode: alloc.NodeID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "allocs"}, + watch.Item{Alloc: alloc.ID}, + watch.Item{AllocEval: alloc.EvalID}, + watch.Item{AllocJob: alloc.JobID}, + watch.Item{AllocNode: alloc.NodeID}) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { @@ -894,14 +884,13 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "allocs"}}, - {desc: "alloc", item: watch.Item{Alloc: alloc.ID}}, - {desc: "alloceval", item: watch.Item{AllocEval: alloc.EvalID}}, - {desc: "allocjob", item: watch.Item{AllocJob: alloc.JobID}}, - {desc: "allocnode", item: watch.Item{AllocNode: alloc.NodeID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "allocs"}, + watch.Item{Alloc: alloc.ID}, + watch.Item{AllocEval: alloc.EvalID}, + watch.Item{AllocJob: alloc.JobID}, + watch.Item{AllocNode: alloc.NodeID}) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { @@ -941,14 +930,13 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { alloc2.ID = alloc.ID alloc2.NodeID = alloc.NodeID + ".new" - notify := notifyTest{ - {desc: "table", item: watch.Item{Table: "allocs"}}, - {desc: "alloc", item: watch.Item{Alloc: alloc2.ID}}, - {desc: "alloceval", item: watch.Item{AllocEval: alloc2.EvalID}}, - {desc: "allocjob", item: watch.Item{AllocJob: alloc2.JobID}}, - {desc: "allocnode", item: watch.Item{AllocNode: alloc2.NodeID}}, - } - notify.start(state) + notify := setupNotifyTest( + state, + watch.Item{Table: "allocs"}, + watch.Item{Alloc: alloc2.ID}, + watch.Item{AllocEval: alloc2.EvalID}, + watch.Item{AllocJob: alloc2.JobID}, + watch.Item{AllocNode: alloc2.NodeID}) err = state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) if err != nil { @@ -1111,13 +1099,21 @@ func TestStateStore_Allocs(t *testing.T) { func TestStateStore_RestoreAlloc(t *testing.T) { state := testStateStore(t) + alloc := mock.Alloc() + + notify := setupNotifyTest( + state, + watch.Item{Table: "allocs"}, + watch.Item{Alloc: alloc.ID}, + watch.Item{AllocEval: alloc.EvalID}, + watch.Item{AllocJob: alloc.JobID}, + watch.Item{AllocNode: alloc.NodeID}) restore, err := state.Restore() if err != nil { t.Fatalf("err: %v", err) } - alloc := mock.Alloc() err = restore.AllocRestore(alloc) if err != nil { t.Fatalf("err: %v", err) @@ -1133,6 +1129,8 @@ func TestStateStore_RestoreAlloc(t *testing.T) { if !reflect.DeepEqual(out, alloc) { t.Fatalf("Bad: %#v %#v", out, alloc) } + + notify.verify(t) } func TestStateWatch_watch(t *testing.T) { @@ -1172,15 +1170,30 @@ func TestStateWatch_stopWatch(t *testing.T) { // Unsubscribe stop notifications sw.stopWatch(watch.Item{Table: "foo"}, notify) + // Check that the group was removed + if _, ok := sw.items[watch.Item{Table: "foo"}]; ok { + t.Fatalf("should remove group") + } + + // Check that we are not notified sw.notify(watch.NewItems(watch.Item{Table: "foo"})) if len(notify) != 0 { t.Fatalf("should not notify") } } +func setupNotifyTest(state *StateStore, items ...watch.Item) notifyTest { + var n notifyTest + for _, item := range items { + ch := make(chan struct{}, 1) + state.Watch(watch.NewItems(item), ch) + n = append(n, ¬ifyTestCase{item, ch}) + } + return n +} + // notifyTestCase is used to set up and verify watch triggers. type notifyTestCase struct { - desc string item watch.Item ch chan struct{} } @@ -1188,19 +1201,11 @@ type notifyTestCase struct { // notifyTest is a suite of notifyTestCases. type notifyTest []*notifyTestCase -// start creates the notify channels and subscribes them. -func (n notifyTest) start(state *StateStore) { - for _, tcase := range n { - tcase.ch = make(chan struct{}, 1) - state.Watch(watch.NewItems(tcase.item), tcase.ch) - } -} - // verify ensures that each channel received a notification. func (n notifyTest) verify(t *testing.T) { for _, tcase := range n { if len(tcase.ch) != 1 { - t.Fatalf("should notify %s", tcase.desc) + t.Fatalf("should notify %#v", tcase.item) } } }