nomad: cleanup and more tests

This commit is contained in:
Ryan Uber
2015-10-30 08:27:47 -07:00
parent ae4156d2b3
commit 284c2e2f2b
7 changed files with 139 additions and 128 deletions

View File

@@ -86,8 +86,8 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
}
// Setup the output
reply.Alloc = out
if out != nil {
reply.Alloc = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table

View File

@@ -74,7 +74,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
@@ -101,7 +101,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
@@ -186,7 +186,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {

View File

@@ -92,7 +92,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
@@ -117,7 +117,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 300 {
@@ -440,7 +440,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
@@ -464,7 +464,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
@@ -551,7 +551,7 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {

View File

@@ -402,7 +402,7 @@ func TestJobEndpoint_GetJob_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
@@ -427,7 +427,7 @@ func TestJobEndpoint_GetJob_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 300 {
@@ -501,7 +501,7 @@ func TestJobEndpoint_ListJobs_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 100 {
@@ -525,7 +525,7 @@ func TestJobEndpoint_ListJobs_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 200 {
@@ -613,7 +613,7 @@ func TestJobEndpoint_Allocations_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {

View File

@@ -410,7 +410,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
@@ -437,7 +437,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp2.Index != 300 {
@@ -461,7 +461,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp3.Index != 400 {
@@ -910,7 +910,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
@@ -934,7 +934,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
@@ -958,7 +958,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp3)
}
if resp3.Index != 4 {
@@ -982,7 +982,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp4)
}
if resp4.Index != 5 {

View File

@@ -455,8 +455,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Delete("allocs", existing); err != nil {
return fmt.Errorf("alloc delete failed: %v", err)
}
watcher.Add(watch.Item{Alloc: alloc})
watcher.Add(watch.Item{AllocNode: existing.(*structs.Allocation).NodeID})
realAlloc := existing.(*structs.Allocation)
watcher.Add(watch.Item{Alloc: realAlloc.ID})
watcher.Add(watch.Item{AllocEval: realAlloc.EvalID})
watcher.Add(watch.Item{AllocJob: realAlloc.JobID})
watcher.Add(watch.Item{AllocNode: realAlloc.NodeID})
}
// Update the indexes
@@ -795,6 +798,9 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
// AllocRestore is used to restore an allocation
func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
r.items.Add(watch.Item{Table: "allocs"})
r.items.Add(watch.Item{Alloc: alloc.ID})
r.items.Add(watch.Item{AllocEval: alloc.EvalID})
r.items.Add(watch.Item{AllocJob: alloc.JobID})
r.items.Add(watch.Item{AllocNode: alloc.NodeID})
if err := r.txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)

View File

@@ -26,11 +26,10 @@ func TestStateStore_UpsertNode_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "nodes"}},
{desc: "node", item: watch.Item{Node: node.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
@@ -61,11 +60,10 @@ func TestStateStore_DeleteNode_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "nodes"}},
{desc: "node", item: watch.Item{Node: node.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
@@ -101,11 +99,10 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "nodes"}},
{desc: "node", item: watch.Item{Node: node.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
@@ -144,11 +141,10 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "nodes"}},
{desc: "node", item: watch.Item{Node: node.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
@@ -223,11 +219,10 @@ func TestStateStore_RestoreNode(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "nodes"}},
{desc: "node", item: watch.Item{Node: node.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
restore, err := state.Restore()
if err != nil {
@@ -256,11 +251,10 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "jobs"}},
{desc: "job", item: watch.Item{Job: job.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
err := state.UpsertJob(1000, job)
if err != nil {
@@ -291,11 +285,10 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "jobs"}},
{desc: "job", item: watch.Item{Job: job.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
err := state.UpsertJob(1000, job)
if err != nil {
@@ -340,11 +333,10 @@ func TestStateStore_DeleteJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "jobs"}},
{desc: "job", item: watch.Item{Job: job.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
err := state.UpsertJob(1000, job)
if err != nil {
@@ -483,11 +475,10 @@ func TestStateStore_RestoreJob(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "jobs"}},
{desc: "job", item: watch.Item{Job: job.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
restore, err := state.Restore()
if err != nil {
@@ -574,11 +565,10 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
state := testStateStore(t)
eval := mock.Eval()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "evals"}},
{desc: "eval", item: watch.Item{Eval: eval.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
@@ -614,11 +604,10 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
t.Fatalf("err: %v", err)
}
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "evals"}},
{desc: "eval", item: watch.Item{Eval: eval.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
eval2 := mock.Eval()
eval2.ID = eval.ID
@@ -661,16 +650,19 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "evals"}},
{desc: "eval1", item: watch.Item{Eval: eval1.ID}},
{desc: "eval2", item: watch.Item{Eval: eval2.ID}},
{desc: "alloc1", item: watch.Item{Alloc: alloc1.ID}},
{desc: "alloc2", item: watch.Item{Alloc: alloc2.ID}},
{desc: "allocnode1", item: watch.Item{AllocNode: alloc1.NodeID}},
{desc: "allocnode2", item: watch.Item{AllocNode: alloc2.NodeID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval1.ID},
watch.Item{Eval: eval2.ID},
watch.Item{Alloc: alloc1.ID},
watch.Item{Alloc: alloc2.ID},
watch.Item{AllocEval: alloc1.EvalID},
watch.Item{AllocEval: alloc2.EvalID},
watch.Item{AllocJob: alloc1.JobID},
watch.Item{AllocJob: alloc2.JobID},
watch.Item{AllocNode: alloc1.NodeID},
watch.Item{AllocNode: alloc2.NodeID})
err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
@@ -813,11 +805,10 @@ func TestStateStore_RestoreEval(t *testing.T) {
state := testStateStore(t)
eval := mock.Eval()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "evals"}},
{desc: "eval", item: watch.Item{Eval: eval.ID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
restore, err := state.Restore()
if err != nil {
@@ -846,14 +837,13 @@ func TestStateStore_UpdateAllocFromClient(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "allocs"}},
{desc: "alloc", item: watch.Item{Alloc: alloc.ID}},
{desc: "alloceval", item: watch.Item{AllocEval: alloc.EvalID}},
{desc: "allocjob", item: watch.Item{AllocJob: alloc.JobID}},
{desc: "allocnode", item: watch.Item{AllocNode: alloc.NodeID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc.ID},
watch.Item{AllocEval: alloc.EvalID},
watch.Item{AllocJob: alloc.JobID},
watch.Item{AllocNode: alloc.NodeID})
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
@@ -894,14 +884,13 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "allocs"}},
{desc: "alloc", item: watch.Item{Alloc: alloc.ID}},
{desc: "alloceval", item: watch.Item{AllocEval: alloc.EvalID}},
{desc: "allocjob", item: watch.Item{AllocJob: alloc.JobID}},
{desc: "allocnode", item: watch.Item{AllocNode: alloc.NodeID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc.ID},
watch.Item{AllocEval: alloc.EvalID},
watch.Item{AllocJob: alloc.JobID},
watch.Item{AllocNode: alloc.NodeID})
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
@@ -941,14 +930,13 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
alloc2.ID = alloc.ID
alloc2.NodeID = alloc.NodeID + ".new"
notify := notifyTest{
{desc: "table", item: watch.Item{Table: "allocs"}},
{desc: "alloc", item: watch.Item{Alloc: alloc2.ID}},
{desc: "alloceval", item: watch.Item{AllocEval: alloc2.EvalID}},
{desc: "allocjob", item: watch.Item{AllocJob: alloc2.JobID}},
{desc: "allocnode", item: watch.Item{AllocNode: alloc2.NodeID}},
}
notify.start(state)
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc2.ID},
watch.Item{AllocEval: alloc2.EvalID},
watch.Item{AllocJob: alloc2.JobID},
watch.Item{AllocNode: alloc2.NodeID})
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc2})
if err != nil {
@@ -1111,13 +1099,21 @@ func TestStateStore_Allocs(t *testing.T) {
func TestStateStore_RestoreAlloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc.ID},
watch.Item{AllocEval: alloc.EvalID},
watch.Item{AllocJob: alloc.JobID},
watch.Item{AllocNode: alloc.NodeID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
alloc := mock.Alloc()
err = restore.AllocRestore(alloc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1133,6 +1129,8 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
if !reflect.DeepEqual(out, alloc) {
t.Fatalf("Bad: %#v %#v", out, alloc)
}
notify.verify(t)
}
func TestStateWatch_watch(t *testing.T) {
@@ -1172,15 +1170,30 @@ func TestStateWatch_stopWatch(t *testing.T) {
// Unsubscribe stop notifications
sw.stopWatch(watch.Item{Table: "foo"}, notify)
// Check that the group was removed
if _, ok := sw.items[watch.Item{Table: "foo"}]; ok {
t.Fatalf("should remove group")
}
// Check that we are not notified
sw.notify(watch.NewItems(watch.Item{Table: "foo"}))
if len(notify) != 0 {
t.Fatalf("should not notify")
}
}
func setupNotifyTest(state *StateStore, items ...watch.Item) notifyTest {
var n notifyTest
for _, item := range items {
ch := make(chan struct{}, 1)
state.Watch(watch.NewItems(item), ch)
n = append(n, &notifyTestCase{item, ch})
}
return n
}
// notifyTestCase is used to set up and verify watch triggers.
type notifyTestCase struct {
desc string
item watch.Item
ch chan struct{}
}
@@ -1188,19 +1201,11 @@ type notifyTestCase struct {
// notifyTest is a suite of notifyTestCases.
type notifyTest []*notifyTestCase
// start creates the notify channels and subscribes them.
func (n notifyTest) start(state *StateStore) {
for _, tcase := range n {
tcase.ch = make(chan struct{}, 1)
state.Watch(watch.NewItems(tcase.item), tcase.ch)
}
}
// verify ensures that each channel received a notification.
func (n notifyTest) verify(t *testing.T) {
for _, tcase := range n {
if len(tcase.ch) != 1 {
t.Fatalf("should notify %s", tcase.desc)
t.Fatalf("should notify %#v", tcase.item)
}
}
}