diff --git a/client/client.go b/client/client.go index 0d322a7b4..6f3077b29 100644 --- a/client/client.go +++ b/client/client.go @@ -2420,6 +2420,13 @@ func (c *Client) updateAlloc(update *structs.Allocation) { return } + // Reconnect unknown allocations + if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex { + update.ClientStatus = ar.AllocState().ClientStatus + update.ClientDescription = ar.AllocState().ClientDescription + c.AllocStateUpdated(update) + } + // Update local copy of alloc if err := c.stateDB.PutAllocation(update); err != nil { c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID) diff --git a/client/client_test.go b/client/client_test.go index 13cb392dc..d6ac20c17 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1713,3 +1713,88 @@ func Test_verifiedTasks(t *testing.T) { try(t, alloc(tgTasks), tasks, tasks, "") }) } + +func TestClient_ReconnectAllocs(t *testing.T) { + t.Parallel() + + s1, _, cleanupS1 := testServer(t, nil) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + c1, cleanupC1 := TestClient(t, func(c *config.Config) { + c.DevMode = false + c.RPCHandler = s1 + }) + defer cleanupC1() + + waitTilNodeReady(c1, t) + + job := mock.Job() + + runningAlloc := mock.Alloc() + runningAlloc.NodeID = c1.Node().ID + runningAlloc.Job = job + runningAlloc.JobID = job.ID + runningAlloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + runningAlloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + runningAlloc.ClientStatus = structs.AllocClientStatusPending + + state := s1.State() + err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job) + require.NoError(t, err) + + err = state.UpsertJobSummary(101, mock.JobSummary(runningAlloc.JobID)) + require.NoError(t, err) + + err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{runningAlloc}) + require.NoError(t, err) + + // Ensure allocation gets upserted with desired status. + testutil.WaitForResult(func() (bool, error) { + upsertResult, stateErr := state.AllocByID(nil, runningAlloc.ID) + return upsertResult.ClientStatus == structs.AllocClientStatusRunning, stateErr + }, func(err error) { + require.NoError(t, err, "allocation query failed") + }) + + // Create the unknown version of the alloc from the running one, update state + // to simulate what reconciler would have done, and then send to the client. + unknownAlloc, err := state.AllocByID(nil, runningAlloc.ID) + require.Equal(t, structs.AllocClientStatusRunning, unknownAlloc.ClientStatus) + require.NoError(t, err) + unknownAlloc.ClientStatus = structs.AllocClientStatusUnknown + err = state.UpsertAllocs(structs.MsgTypeTestSetup, runningAlloc.AllocModifyIndex+1, []*structs.Allocation{unknownAlloc}) + require.NoError(t, err) + + updates := &allocUpdates{ + pulled: map[string]*structs.Allocation{ + unknownAlloc.ID: unknownAlloc, + }, + } + + c1.runAllocs(updates) + + invalid := false + var runner AllocRunner + var finalAlloc *structs.Allocation + // Ensure the allocation is not invalid on the client and has been marked + // running on the server with the new modify index + testutil.WaitForResult(func() (result bool, stateErr error) { + c1.allocLock.RLock() + runner = c1.allocs[unknownAlloc.ID] + _, invalid = c1.invalidAllocs[unknownAlloc.ID] + c1.allocLock.RUnlock() + + finalAlloc, stateErr = state.AllocByID(nil, unknownAlloc.ID) + result = structs.AllocClientStatusRunning == finalAlloc.ClientStatus + return + }, func(err error) { + require.NoError(t, err, "allocation server check failed") + }) + + require.NotNil(t, runner, "expected alloc runner") + require.False(t, invalid, "expected alloc to not be marked invalid") + require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex) +}