From 43ab32f96c4fedfe424b0c83c03e9d0b9a02c130 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 7 Feb 2017 21:22:48 -0800 Subject: [PATCH] Tests compile --- client/client_test.go | 10 ++- nomad/core_sched_test.go | 90 ++++++++++++++---------- nomad/eval_endpoint_test.go | 10 ++- nomad/fsm_test.go | 119 ++++++++++++++++++++------------ nomad/heartbeat_test.go | 4 +- nomad/job_endpoint_test.go | 58 ++++++++++------ nomad/leader_test.go | 13 ++-- nomad/node_endpoint_test.go | 57 +++++++++------ nomad/periodic_endpoint_test.go | 4 +- nomad/plan_apply_test.go | 12 ++-- nomad/system_endpoint_test.go | 7 +- nomad/worker_test.go | 10 ++- scheduler/generic_sched_test.go | 61 ++++++++++------ scheduler/system_sched_test.go | 27 +++++--- 14 files changed, 307 insertions(+), 175 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 0455ca3fa..975517245 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad" @@ -477,7 +478,8 @@ func TestClient_UpdateAllocStatus(t *testing.T) { state.UpsertAllocs(101, []*structs.Allocation{alloc}) testutil.WaitForResult(func() (bool, error) { - out, err := state.AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) if err != nil { return false, err } @@ -724,7 +726,8 @@ func TestClient_BlockedAllocations(t *testing.T) { // Wait for the node to be ready state := s1.State() testutil.WaitForResult(func() (bool, error) { - out, err := state.NodeByID(c1.Node().ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, c1.Node().ID) if err != nil { return false, err } @@ -753,7 +756,8 @@ func TestClient_BlockedAllocations(t *testing.T) { // Wait until the client downloads and starts the allocation testutil.WaitForResult(func() (bool, error) { - out, err := state.AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) if err != nil { return false, err } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 72bd4bf66..3f1c6d247 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -63,7 +64,8 @@ func TestCoreScheduler_EvalGC(t *testing.T) { } // Should be gone - out, err := state.EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -71,7 +73,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { t.Fatalf("bad: %v", out) } - outA, err := state.AllocByID(alloc.ID) + outA, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -79,7 +81,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { t.Fatalf("bad: %v", outA) } - outA2, err := state.AllocByID(alloc2.ID) + outA2, err := state.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -154,7 +156,8 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { } // Nothing should be gone - out, err := state.EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -162,7 +165,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { t.Fatalf("bad: %v", out) } - outA, err := state.AllocByID(alloc.ID) + outA, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -170,7 +173,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { t.Fatalf("bad: %v", outA) } - outA2, err := state.AllocByID(alloc2.ID) + outA2, err := state.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -178,7 +181,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { t.Fatalf("bad: %v", outA2) } - outB, err := state.JobByID(job.ID) + outB, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -251,7 +254,8 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { } // Should not be gone - out, err := state.EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -259,7 +263,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { t.Fatalf("bad: %v", out) } - outA, err := state.AllocByID(alloc3.ID) + outA, err := state.AllocByID(ws, alloc3.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -268,7 +272,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { } // Should be gone - outB, err := state.AllocByID(alloc.ID) + outB, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -276,7 +280,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { t.Fatalf("bad: %v", outB) } - outC, err := state.AllocByID(alloc2.ID) + outC, err := state.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -328,7 +332,8 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { } // Should be gone - out, err := state.EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -336,7 +341,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { t.Fatalf("bad: %v", out) } - outA, err := state.AllocByID(alloc.ID) + outA, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -381,7 +386,8 @@ func TestCoreScheduler_NodeGC(t *testing.T) { } // Should be gone - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -434,7 +440,8 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) { } // Should be gone - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -489,7 +496,8 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) { } // Should still be here - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -530,7 +538,8 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) { } // Should be gone - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -589,7 +598,8 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { } // Should still exist - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -597,7 +607,7 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err := state.EvalByID(eval.ID) + outE, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -605,7 +615,7 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { t.Fatalf("bad: %v", outE) } - outE2, err := state.EvalByID(eval2.ID) + outE2, err := state.EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -635,7 +645,7 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { } // Should not still exist - out, err = state.JobByID(job.ID) + out, err = state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -643,7 +653,7 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err = state.EvalByID(eval.ID) + outE, err = state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -651,7 +661,7 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { t.Fatalf("bad: %v", outE) } - outE2, err = state.EvalByID(eval2.ID) + outE2, err = state.EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -724,7 +734,8 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { } // Should still exist - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -732,7 +743,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { t.Fatalf("bad: %v", out) } - outA, err := state.AllocByID(alloc.ID) + outA, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -740,7 +751,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { t.Fatalf("bad: %v", outA) } - outA2, err := state.AllocByID(alloc2.ID) + outA2, err := state.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -770,7 +781,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { } // Should not still exist - out, err = state.JobByID(job.ID) + out, err = state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -778,7 +789,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { t.Fatalf("bad: %v", out) } - outA, err = state.AllocByID(alloc.ID) + outA, err = state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -786,7 +797,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { t.Fatalf("bad: %v", outA) } - outA2, err = state.AllocByID(alloc2.ID) + outA2, err = state.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -866,7 +877,8 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) { } // Should still exist - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -874,7 +886,7 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err := state.EvalByID(eval.ID) + outE, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -882,7 +894,7 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) { t.Fatalf("bad: %v", outE) } - outE2, err := state.EvalByID(eval2.ID) + outE2, err := state.EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -890,14 +902,14 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) { t.Fatalf("bad: %v", outE2) } - outA, err := state.AllocByID(alloc.ID) + outA, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } if outA == nil { t.Fatalf("bad: %v", outA) } - outA2, err := state.AllocByID(alloc2.ID) + outA2, err := state.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -948,7 +960,8 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { } // Shouldn't still exist - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -956,7 +969,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err := state.EvalByID(eval.ID) + outE, err := state.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1008,7 +1021,8 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { } // Should still exist - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1016,7 +1030,7 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err := state.JobByID(job2.ID) + outE, err := state.JobByID(ws, job2.ID) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index cf5473ca7..f5fb358e4 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -314,7 +315,8 @@ func TestEvalEndpoint_Update(t *testing.T) { } // Ensure updated - outE, err := s1.fsm.State().EvalByID(eval2.ID) + ws := memdb.NewWatchSet() + outE, err := s1.fsm.State().EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -361,7 +363,8 @@ func TestEvalEndpoint_Create(t *testing.T) { } // Ensure created - outE, err := s1.fsm.State().EvalByID(eval1.ID) + ws := memdb.NewWatchSet() + outE, err := s1.fsm.State().EvalByID(ws, eval1.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -397,7 +400,8 @@ func TestEvalEndpoint_Reap(t *testing.T) { } // Ensure deleted - outE, err := s1.fsm.State().EvalByID(eval1.ID) + ws := memdb.NewWatchSet() + outE, err := s1.fsm.State().EvalByID(ws, eval1.ID) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 1883a6c97..799570e47 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -92,7 +93,8 @@ func TestFSM_UpsertNode(t *testing.T) { } // Verify we are registered - n, err := fsm.State().NodeByID(req.Node.ID) + ws := memdb.NewWatchSet() + n, err := fsm.State().NodeByID(ws, req.Node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -153,7 +155,8 @@ func TestFSM_DeregisterNode(t *testing.T) { } // Verify we are NOT registered - node, err = fsm.State().NodeByID(req.Node.ID) + ws := memdb.NewWatchSet() + node, err = fsm.State().NodeByID(ws, req.Node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -200,7 +203,8 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { } // Verify the status is ready. - node, err = fsm.State().NodeByID(req.Node.ID) + ws := memdb.NewWatchSet() + node, err = fsm.State().NodeByID(ws, req.Node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -252,7 +256,8 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { } // Verify we are NOT registered - node, err = fsm.State().NodeByID(req.Node.ID) + ws := memdb.NewWatchSet() + node, err = fsm.State().NodeByID(ws, req.Node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -279,7 +284,8 @@ func TestFSM_RegisterJob(t *testing.T) { } // Verify we are registered - jobOut, err := fsm.State().JobByID(req.Job.ID) + ws := memdb.NewWatchSet() + jobOut, err := fsm.State().JobByID(ws, req.Job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -296,7 +302,7 @@ func TestFSM_RegisterJob(t *testing.T) { } // Verify the launch time was tracked. - launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID) + launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -339,7 +345,8 @@ func TestFSM_DeregisterJob(t *testing.T) { } // Verify we are NOT registered - jobOut, err := fsm.State().JobByID(req.Job.ID) + ws := memdb.NewWatchSet() + jobOut, err := fsm.State().JobByID(ws, req.Job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -353,7 +360,7 @@ func TestFSM_DeregisterJob(t *testing.T) { } // Verify it was removed from the periodic launch table. - launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID) + launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -380,7 +387,8 @@ func TestFSM_UpdateEval(t *testing.T) { } // Verify we are registered - eval, err := fsm.State().EvalByID(req.Evals[0].ID) + ws := memdb.NewWatchSet() + eval, err := fsm.State().EvalByID(ws, req.Evals[0].ID) if err != nil { t.Fatalf("err: %v", err) } @@ -421,7 +429,8 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) { } // Verify we are registered - out, err := fsm.State().EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -474,7 +483,8 @@ func TestFSM_UpdateEval_Untrack(t *testing.T) { } // Verify we are registered - out, err := fsm.State().EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -529,7 +539,8 @@ func TestFSM_UpdateEval_NoUntrack(t *testing.T) { } // Verify we are registered - out, err := fsm.State().EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -584,7 +595,8 @@ func TestFSM_DeleteEval(t *testing.T) { } // Verify we are NOT registered - eval, err = fsm.State().EvalByID(req.Evals[0].ID) + ws := memdb.NewWatchSet() + eval, err = fsm.State().EvalByID(ws, req.Evals[0].ID) if err != nil { t.Fatalf("err: %v", err) } @@ -612,7 +624,8 @@ func TestFSM_UpsertAllocs(t *testing.T) { } // Verify we are registered - out, err := fsm.State().AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -640,7 +653,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { } // Verify we are evicted - out, err = fsm.State().AllocByID(alloc.ID) + out, err = fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -671,7 +684,8 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) { } // Verify we are registered - out, err := fsm.State().AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -708,7 +722,7 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) { } // Verify we are evicted - out, err = fsm.State().AllocByID(alloc.ID) + out, err = fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -743,7 +757,8 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { } // Verify we are registered - out, err := fsm.State().AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -808,7 +823,8 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { } // Verify we are updated - out, err := fsm.State().AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -818,7 +834,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { t.Fatalf("bad: %#v %#v", clientAlloc, out) } - out, err = fsm.State().AllocByID(alloc2.ID) + out, err = fsm.State().AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -868,7 +884,8 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { } // Verify we are registered - out, err := fsm.State().AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -899,7 +916,8 @@ func TestFSM_UpsertVaultAccessor(t *testing.T) { } // Verify we are registered - out1, err := fsm.State().VaultAccessor(va.Accessor) + ws := memdb.NewWatchSet() + out1, err := fsm.State().VaultAccessor(ws, va.Accessor) if err != nil { t.Fatalf("err: %v", err) } @@ -909,7 +927,7 @@ func TestFSM_UpsertVaultAccessor(t *testing.T) { if out1.CreateIndex != 1 { t.Fatalf("bad index: %d", out1.CreateIndex) } - out2, err := fsm.State().VaultAccessor(va2.Accessor) + out2, err := fsm.State().VaultAccessor(ws, va2.Accessor) if err != nil { t.Fatalf("err: %v", err) } @@ -953,7 +971,8 @@ func TestFSM_DeregisterVaultAccessor(t *testing.T) { t.Fatalf("resp: %v", resp) } - out1, err := fsm.State().VaultAccessor(va.Accessor) + ws := memdb.NewWatchSet() + out1, err := fsm.State().VaultAccessor(ws, va.Accessor) if err != nil { t.Fatalf("err: %v", err) } @@ -1005,8 +1024,9 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) { // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.NodeByID(node1.ID) - out2, _ := state2.NodeByID(node2.ID) + ws := memdb.NewWatchSet() + out1, _ := state2.NodeByID(ws, node1.ID) + out2, _ := state2.NodeByID(ws, node2.ID) if !reflect.DeepEqual(node1, out1) { t.Fatalf("bad: \n%#v\n%#v", out1, node1) } @@ -1025,10 +1045,11 @@ func TestFSM_SnapshotRestore_Jobs(t *testing.T) { state.UpsertJob(1001, job2) // Verify the contents + ws := memdb.NewWatchSet() fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.JobByID(job1.ID) - out2, _ := state2.JobByID(job2.ID) + out1, _ := state2.JobByID(ws, job1.ID) + out2, _ := state2.JobByID(ws, job2.ID) if !reflect.DeepEqual(job1, out1) { t.Fatalf("bad: \n%#v\n%#v", out1, job1) } @@ -1049,8 +1070,9 @@ func TestFSM_SnapshotRestore_Evals(t *testing.T) { // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.EvalByID(eval1.ID) - out2, _ := state2.EvalByID(eval2.ID) + ws := memdb.NewWatchSet() + out1, _ := state2.EvalByID(ws, eval1.ID) + out2, _ := state2.EvalByID(ws, eval2.ID) if !reflect.DeepEqual(eval1, out1) { t.Fatalf("bad: \n%#v\n%#v", out1, eval1) } @@ -1073,8 +1095,9 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.AllocByID(alloc1.ID) - out2, _ := state2.AllocByID(alloc2.ID) + ws := memdb.NewWatchSet() + out1, _ := state2.AllocByID(ws, alloc1.ID) + out2, _ := state2.AllocByID(ws, alloc2.ID) if !reflect.DeepEqual(alloc1, out1) { t.Fatalf("bad: \n%#v\n%#v", out1, alloc1) } @@ -1099,8 +1122,9 @@ func TestFSM_SnapshotRestore_Allocs_NoSharedResources(t *testing.T) { // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.AllocByID(alloc1.ID) - out2, _ := state2.AllocByID(alloc2.ID) + ws := memdb.NewWatchSet() + out1, _ := state2.AllocByID(ws, alloc1.ID) + out2, _ := state2.AllocByID(ws, alloc2.ID) alloc1.SharedResources = &structs.Resources{DiskMB: 150} alloc2.SharedResources = &structs.Resources{DiskMB: 150} @@ -1167,8 +1191,9 @@ func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) { // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.PeriodicLaunchByID(launch1.ID) - out2, _ := state2.PeriodicLaunchByID(launch2.ID) + ws := memdb.NewWatchSet() + out1, _ := state2.PeriodicLaunchByID(ws, launch1.ID) + out2, _ := state2.PeriodicLaunchByID(ws, launch2.ID) if !reflect.DeepEqual(launch1, out1) { t.Fatalf("bad: \n%#v\n%#v", out1, job1) } @@ -1184,17 +1209,18 @@ func TestFSM_SnapshotRestore_JobSummary(t *testing.T) { job1 := mock.Job() state.UpsertJob(1000, job1) - js1, _ := state.JobSummaryByID(job1.ID) + ws := memdb.NewWatchSet() + js1, _ := state.JobSummaryByID(ws, job1.ID) job2 := mock.Job() state.UpsertJob(1001, job2) - js2, _ := state.JobSummaryByID(job2.ID) + js2, _ := state.JobSummaryByID(ws, job2.ID) // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.JobSummaryByID(job1.ID) - out2, _ := state2.JobSummaryByID(job2.ID) + out1, _ := state2.JobSummaryByID(ws, job1.ID) + out2, _ := state2.JobSummaryByID(ws, job2.ID) if !reflect.DeepEqual(js1, out1) { t.Fatalf("bad: \n%#v\n%#v", js1, out1) } @@ -1214,8 +1240,9 @@ func TestFSM_SnapshotRestore_VaultAccessors(t *testing.T) { // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.VaultAccessor(a1.Accessor) - out2, _ := state2.VaultAccessor(a2.Accessor) + ws := memdb.NewWatchSet() + out1, _ := state2.VaultAccessor(ws, a1.Accessor) + out2, _ := state2.VaultAccessor(ws, a2.Accessor) if !reflect.DeepEqual(a1, out1) { t.Fatalf("bad: \n%#v\n%#v", out1, a1) } @@ -1246,7 +1273,8 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { state2 := fsm2.State() latestIndex, _ := state.LatestIndex() - out, _ := state2.JobSummaryByID(alloc.Job.ID) + ws := memdb.NewWatchSet() + out, _ := state2.JobSummaryByID(ws, alloc.Job.ID) expected := structs.JobSummary{ JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ @@ -1297,7 +1325,8 @@ func TestFSM_ReconcileSummaries(t *testing.T) { t.Fatalf("resp: %v", resp) } - out1, _ := state.JobSummaryByID(job1.ID) + ws := memdb.NewWatchSet() + out1, _ := state.JobSummaryByID(ws, job1.ID) expected := structs.JobSummary{ JobID: job1.ID, Summary: map[string]structs.TaskGroupSummary{ @@ -1315,7 +1344,7 @@ func TestFSM_ReconcileSummaries(t *testing.T) { // This exercises the code path which adds the allocations made by the // planner and the number of unplaced allocations in the reconcile summaries // codepath - out2, _ := state.JobSummaryByID(alloc.Job.ID) + out2, _ := state.JobSummaryByID(ws, alloc.Job.ID) expected = structs.JobSummary{ JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go index 7ab5495a8..dc5b29c4c 100644 --- a/nomad/heartbeat_test.go +++ b/nomad/heartbeat_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -132,7 +133,8 @@ func TestInvalidateHeartbeat(t *testing.T) { s1.invalidateHeartbeat(node.ID) // Check it is updated - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e2f17166b..cbadecdcb 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -39,7 +40,8 @@ func TestJobEndpoint_Register(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -56,7 +58,7 @@ func TestJobEndpoint_Register(t *testing.T) { } // Lookup the evaluation - eval, err := state.EvalByID(resp.EvalID) + eval, err := state.EvalByID(ws, resp.EvalID) if err != nil { t.Fatalf("err: %v", err) } @@ -185,7 +187,8 @@ func TestJobEndpoint_Register_Existing(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -200,7 +203,7 @@ func TestJobEndpoint_Register_Existing(t *testing.T) { } // Lookup the evaluation - eval, err := state.EvalByID(resp.EvalID) + eval, err := state.EvalByID(ws, resp.EvalID) if err != nil { t.Fatalf("err: %v", err) } @@ -257,7 +260,8 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -306,7 +310,8 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) { // Check for the job in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -365,7 +370,8 @@ func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -421,7 +427,7 @@ func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { t.Fatalf("bad index: %d", resp.Index) } - out, err = state.JobByID(job.ID) + out, err = state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -498,7 +504,8 @@ func TestJobEndpoint_Register_Vault_AllowUnauthenticated(t *testing.T) { // Check for the job in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -621,7 +628,8 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) { // Check for the job in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -664,7 +672,7 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) { } // Check for the job in the FSM - out, err = state.JobByID(job2.ID) + out, err = state.JobByID(ws, job2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -719,7 +727,8 @@ func TestJobEndpoint_Evaluate(t *testing.T) { // Lookup the evaluation state := s1.fsm.State() - eval, err := state.EvalByID(resp.EvalID) + ws := memdb.NewWatchSet() + eval, err := state.EvalByID(ws, resp.EvalID) if err != nil { t.Fatalf("err: %v", err) } @@ -859,8 +868,9 @@ func TestJobEndpoint_Deregister(t *testing.T) { } // Check for the node in the FSM + ws := memdb.NewWatchSet() state := s1.fsm.State() - out, err := state.JobByID(job.ID) + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -869,7 +879,7 @@ func TestJobEndpoint_Deregister(t *testing.T) { } // Lookup the evaluation - eval, err := state.EvalByID(resp2.EvalID) + eval, err := state.EvalByID(ws, resp2.EvalID) if err != nil { t.Fatalf("err: %v", err) } @@ -924,7 +934,8 @@ func TestJobEndpoint_Deregister_NonExistent(t *testing.T) { // Lookup the evaluation state := s1.fsm.State() - eval, err := state.EvalByID(resp2.EvalID) + ws := memdb.NewWatchSet() + eval, err := state.EvalByID(ws, resp2.EvalID) if err != nil { t.Fatalf("err: %v", err) } @@ -991,7 +1002,8 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1042,7 +1054,8 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1782,7 +1795,8 @@ func TestJobEndpoint_ImplicitConstraints_Vault(t *testing.T) { // Check for the job in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1837,7 +1851,8 @@ func TestJobEndpoint_ImplicitConstraints_Signals(t *testing.T) { // Check for the job in the FSM state := s1.fsm.State() - out, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -2078,7 +2093,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) { } state := s1.fsm.State() - out, err := state.JobByID(dispatchResp.DispatchedJobID) + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, dispatchResp.DispatchedJobID) if err != nil { t.Fatalf("err: %v", err) } @@ -2093,7 +2109,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) { } // Lookup the evaluation - eval, err := state.EvalByID(dispatchResp.EvalID) + eval, err := state.EvalByID(ws, dispatchResp.EvalID) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 71b4e7878..987e716bc 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -406,7 +407,8 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { } // Check that an eval was made. - last, err := s1.fsm.State().PeriodicLaunchByID(job.ID) + ws := memdb.NewWatchSet() + last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.ID) if err != nil || last == nil { t.Fatalf("failed to get periodic launch time: %v", err) } @@ -457,7 +459,8 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { } // Check that an eval was made. - last, err := s1.fsm.State().PeriodicLaunchByID(job.ID) + ws := memdb.NewWatchSet() + last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.ID) if err != nil || last == nil { t.Fatalf("failed to get periodic launch time: %v", err) } @@ -508,7 +511,8 @@ func TestLeader_ReapFailedEval(t *testing.T) { // Wait updated evaluation state := s1.fsm.State() testutil.WaitForResult(func() (bool, error) { - out, err := state.EvalByID(eval.ID) + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval.ID) if err != nil { return false, err } @@ -535,7 +539,8 @@ func TestLeader_ReapDuplicateEval(t *testing.T) { // Wait for the evaluation to marked as cancelled state := s1.fsm.State() testutil.WaitForResult(func() (bool, error) { - out, err := state.EvalByID(eval2.ID) + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval2.ID) if err != nil { return false, err } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 749e48b83..e8ee41c8a 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -38,7 +39,8 @@ func TestClientEndpoint_Register(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -85,7 +87,8 @@ func TestClientEndpoint_Register_NoSecret(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -161,7 +164,8 @@ func TestClientEndpoint_Deregister(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -215,7 +219,8 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) { } // Check for the node in the FSM - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -276,7 +281,8 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -384,7 +390,8 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { } evalID := resp.EvalIDs[0] - eval, err := state.EvalByID(evalID) + ws := memdb.NewWatchSet() + eval, err := state.EvalByID(ws, evalID) if err != nil { t.Fatalf("could not get eval %v", evalID) } @@ -394,7 +401,7 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { } // Check for the node in the FSM - out, err := state.NodeByID(node.ID) + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -490,7 +497,8 @@ func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) { } evalID := resp2.EvalIDs[0] - eval, err := state.EvalByID(evalID) + ws := memdb.NewWatchSet() + eval, err := state.EvalByID(ws, evalID) if err != nil { t.Fatalf("could not get eval %v", evalID) } @@ -506,7 +514,7 @@ func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) { } // Check for the node in the FSM - out, err := state.NodeByID(node.ID) + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -627,7 +635,8 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - out, err := state.NodeByID(node.ID) + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -683,11 +692,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { // Wait for the scheduler to create an allocation testutil.WaitForResult(func() (bool, error) { - allocs, err := s1.fsm.state.AllocsByJob(job.ID, true) + ws := memdb.NewWatchSet() + allocs, err := s1.fsm.state.AllocsByJob(ws, job.ID, true) if err != nil { return false, err } - allocs1, err := s1.fsm.state.AllocsByJob(job1.ID, true) + allocs1, err := s1.fsm.state.AllocsByJob(ws, job1.ID, true) if err != nil { return false, err } @@ -719,7 +729,8 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { // Ensure that the allocation has transitioned to lost testutil.WaitForResult(func() (bool, error) { - summary, err := s1.fsm.state.JobSummaryByID(job.ID) + ws := memdb.NewWatchSet() + summary, err := s1.fsm.state.JobSummaryByID(ws, job.ID) if err != nil { return false, err } @@ -739,7 +750,7 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { return false, fmt.Errorf("expected: %#v, actual: %#v", expectedSummary, summary) } - summary1, err := s1.fsm.state.JobSummaryByID(job1.ID) + summary1, err := s1.fsm.state.JobSummaryByID(ws, job1.ID) if err != nil { return false, err } @@ -1289,7 +1300,8 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { } // Lookup the alloc - out, err := state.AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1344,7 +1356,8 @@ func TestClientEndpoint_BatchUpdate(t *testing.T) { } // Lookup the alloc - out, err := state.AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1415,7 +1428,8 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { } // Lookup the alloc - out, err := state.AllocByID(alloc.ID) + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -1460,9 +1474,10 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) { } // Lookup the evaluations + ws := memdb.NewWatchSet() evalByType := make(map[string]*structs.Evaluation, 2) for _, id := range ids { - eval, err := state.EvalByID(id) + eval, err := state.EvalByID(ws, id) if err != nil { t.Fatalf("err: %v", err) } @@ -1559,7 +1574,8 @@ func TestClientEndpoint_Evaluate(t *testing.T) { } // Lookup the evaluation - eval, err := state.EvalByID(ids[0]) + ws := memdb.NewWatchSet() + eval, err := state.EvalByID(ws, ids[0]) if err != nil { t.Fatalf("err: %v", err) } @@ -1936,7 +1952,8 @@ func TestClientEndpoint_DeriveVaultToken(t *testing.T) { } // Check the state store and ensure that we created a VaultAccessor - va, err := state.VaultAccessor(accessor) + ws := memdb.NewWatchSet() + va, err := state.VaultAccessor(ws, accessor) if err != nil { t.Fatalf("bad: %v", err) } diff --git a/nomad/periodic_endpoint_test.go b/nomad/periodic_endpoint_test.go index 295070162..fb6cbcec2 100644 --- a/nomad/periodic_endpoint_test.go +++ b/nomad/periodic_endpoint_test.go @@ -3,6 +3,7 @@ package nomad import ( "testing" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -42,7 +43,8 @@ func TestPeriodicEndpoint_Force(t *testing.T) { } // Lookup the evaluation - eval, err := state.EvalByID(resp.EvalID) + ws := memdb.NewWatchSet() + eval, err := state.EvalByID(ws, resp.EvalID) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 2584556b0..7b0e3e659 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -4,6 +4,7 @@ import ( "reflect" "testing" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -88,7 +89,8 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Verify our optimistic snapshot is updated - if out, err := snap.AllocByID(alloc.ID); err != nil || out == nil { + ws := memdb.NewWatchSet() + if out, err := snap.AllocByID(ws, alloc.ID); err != nil || out == nil { t.Fatalf("bad: %v %v", out, err) } @@ -102,7 +104,7 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Lookup the allocation - out, err := s1.fsm.State().AllocByID(alloc.ID) + out, err := s1.fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -141,7 +143,7 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Check that our optimistic view is updated - if out, _ := snap.AllocByID(allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict { + if out, _ := snap.AllocByID(ws, allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict { t.Fatalf("bad: %#v", out) } @@ -155,7 +157,7 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Lookup the allocation - out, err = s1.fsm.State().AllocByID(alloc.ID) + out, err = s1.fsm.State().AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -167,7 +169,7 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Lookup the allocation - out, err = s1.fsm.State().AllocByID(alloc2.ID) + out, err = s1.fsm.State().AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 91e5d51c9..14dada680 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -5,6 +5,7 @@ import ( "reflect" "testing" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -38,7 +39,8 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { testutil.WaitForResult(func() (bool, error) { // Check if the job has been GC'd - exist, err := state.JobByID(job.ID) + ws := memdb.NewWatchSet() + exist, err := state.JobByID(ws, job.ID) if err != nil { return false, err } @@ -81,7 +83,8 @@ func TestSystemEndpoint_ReconcileSummaries(t *testing.T) { testutil.WaitForResult(func() (bool, error) { // Check if Nomad has reconciled the summary for the job - summary, err := state.JobSummaryByID(job.ID) + ws := memdb.NewWatchSet() + summary, err := state.JobSummaryByID(ws, job.ID) if err != nil { return false, err } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index fea703ceb..4de1ddcf3 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -397,7 +398,8 @@ func TestWorker_UpdateEval(t *testing.T) { t.Fatalf("err: %v", err) } - out, err := s1.fsm.State().EvalByID(eval2.ID) + ws := memdb.NewWatchSet() + out, err := s1.fsm.State().EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -443,7 +445,8 @@ func TestWorker_CreateEval(t *testing.T) { t.Fatalf("err: %v", err) } - out, err := s1.fsm.State().EvalByID(eval2.ID) + ws := memdb.NewWatchSet() + out, err := s1.fsm.State().EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -512,7 +515,8 @@ func TestWorker_ReblockEval(t *testing.T) { } // Check that the eval was updated - eval, err := s1.fsm.State().EvalByID(eval2.ID) + ws := memdb.NewWatchSet() + eval, err := s1.fsm.State().EvalByID(ws, eval2.ID) if err != nil { t.Fatal(err) } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index e0fdbaaf4..36ec50b0f 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) @@ -67,7 +68,8 @@ func TestServiceSched_JobRegister(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -215,7 +217,8 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure only one allocation was placed @@ -270,7 +273,8 @@ func TestServiceSched_JobRegister_Annotate(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -335,7 +339,8 @@ func TestServiceSched_JobRegister_CountZero(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure no allocations placed @@ -561,7 +566,8 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) { } // Ensure two allocations placed - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) if len(out) != 2 { t.Fatalf("bad: %#v", out) @@ -680,7 +686,8 @@ func TestServiceSched_Plan_Partial_Progress(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure only one allocations placed @@ -800,7 +807,8 @@ func TestServiceSched_EvaluateBlockedEval_Finished(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -908,7 +916,8 @@ func TestServiceSched_JobModify(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -999,7 +1008,8 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -1095,7 +1105,8 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -1283,7 +1294,8 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -1350,7 +1362,8 @@ func TestServiceSched_JobDeregister(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure that the job field on the allocation is still populated @@ -1401,8 +1414,9 @@ func TestServiceSched_NodeDown(t *testing.T) { noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Mark some allocs as running + ws := memdb.NewWatchSet() for i := 0; i < 4; i++ { - out, _ := h.State.AllocByID(allocs[i].ID) + out, _ := h.State.AllocByID(ws, allocs[i].ID) out.ClientStatus = structs.AllocClientStatusRunning noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{out})) } @@ -1468,8 +1482,9 @@ func TestServiceSched_NodeUpdate(t *testing.T) { noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Mark some allocs as running + ws := memdb.NewWatchSet() for i := 0; i < 4; i++ { - out, _ := h.State.AllocByID(allocs[i].ID) + out, _ := h.State.AllocByID(ws, allocs[i].ID) out.ClientStatus = structs.AllocClientStatusRunning noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{out})) } @@ -1560,7 +1575,8 @@ func TestServiceSched_NodeDrain(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -1829,7 +1845,8 @@ func TestServiceSched_RetryLimit(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure no allocations placed @@ -1882,7 +1899,8 @@ func TestBatchSched_Run_CompleteAlloc(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure no allocations placed @@ -1935,7 +1953,8 @@ func TestBatchSched_Run_DrainedAlloc(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure a replacement alloc was placed. @@ -1987,7 +2006,8 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure a replacement alloc was placed. @@ -2105,7 +2125,8 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure no replacement alloc was placed. diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 438014e2d..313d573d3 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) @@ -58,7 +59,8 @@ func TestSystemSched_JobRegister(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -182,7 +184,8 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -205,7 +208,7 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) { t.Fatalf("err: %v", err) } - out, err = h1.State.AllocsByJob(job1.ID, false) + out, err = h1.State.AllocsByJob(ws, job1.ID, false) noErr(t, err) if len(out) != 0 { t.Fatalf("bad: %#v", out) @@ -319,7 +322,8 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -430,7 +434,8 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -558,7 +563,8 @@ func TestSystemSched_JobModify(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -746,7 +752,8 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure all allocations placed @@ -822,7 +829,8 @@ func TestSystemSched_JobDeregister(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure no remaining allocations @@ -1094,7 +1102,8 @@ func TestSystemSched_RetryLimit(t *testing.T) { } // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID, false) + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) noErr(t, err) // Ensure no allocations placed