diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index b5173c82a..fec7743e6 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -352,8 +352,9 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou return err } - onDrainingNode = node.DrainStrategy != nil - drainingNodes[node.ID] = onDrainingNode + // Check if the node exists and whether it has a drain strategy + onDrainingNode = node != nil && node.DrainStrategy != nil + drainingNodes[alloc.NodeID] = onDrainingNode } // Check if the alloc should be considered migrated. A migrated diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index b44303a6c..ad0926c3c 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -663,3 +664,75 @@ func TestHandleTaskGroup_Migrations(t *testing.T) { require.Empty(res.migrated) require.True(res.done) } + +// This test asserts that handle task group works when an allocation is on a +// garbage collected node +func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Create a draining node + state := state.TestStateStore(t) + n := mock.Node() + n.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 5 * time.Minute, + }, + ForceDeadline: time.Now().Add(1 * time.Minute), + } + require.Nil(state.UpsertNode(100, n)) + + job := mock.Job() + require.Nil(state.UpsertJob(101, job)) + + // Create 10 done allocs + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = n.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + + if i%2 == 0 { + a.DesiredStatus = structs.AllocDesiredStatusStop + } else { + a.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, a) + } + + // Make the first one be on a GC'd node + allocs[0].NodeID = uuid.Generate() + require.Nil(state.UpsertAllocs(102, allocs)) + + snap, err := state.Snapshot() + require.Nil(err) + + // Handle before and after indexes as both service and batch + res := newJobResult() + require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Len(res.migrated, 9) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Len(res.migrated, 9) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) +} diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 315a538f7..61a6c1587 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -11,6 +11,7 @@ import ( memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -658,6 +659,171 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { }) } +func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes, registering the second later + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Create a service job that runs on just one + job := mock.Job() + job.TaskGroups[0].Count = 2 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + job.CreateIndex = resp.JobModifyIndex + + // Create a system job + sysjob := mock.SystemJob() + req = &structs.JobRegisterRequest{ + Job: sysjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + sysjob.CreateIndex = resp.JobModifyIndex + + // Create a batch job + bjob := mock.BatchJob() + bjob.TaskGroups[0].Count = 2 + req = &structs.JobRegisterRequest{ + Job: bjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + bjob.CreateIndex = resp.JobModifyIndex + + // Wait for the allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Create some old terminal allocs for each job that point at a non-existent + // node to simulate it being on a GC'd node. + var badAllocs []*structs.Allocation + for _, job := range []*structs.Job{job, sysjob, bjob} { + alloc := mock.Alloc() + alloc.Namespace = job.Namespace + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.TaskGroup = job.TaskGroups[0].Name + alloc.DesiredStatus = structs.AllocDesiredStatusStop + alloc.ClientStatus = structs.AllocClientStatusComplete + badAllocs = append(badAllocs, alloc) + } + require.NoError(state.UpsertAllocs(1, badAllocs)) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 2 * time.Second, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + errCh := make(chan error, 2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + + // Wait for the allocs to be stopped + var finalAllocs []*structs.Allocation + testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + + var err error + finalAllocs, err = state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range finalAllocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) + } + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the allocations to be placed on the other node + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + // Test that transistions to force drain work. func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel()