mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
client: reconnect unknown allocations and sync state
This commit is contained in:
@@ -2420,6 +2420,13 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
|
||||
return
|
||||
}
|
||||
|
||||
// Reconnect unknown allocations
|
||||
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
|
||||
update.ClientStatus = ar.AllocState().ClientStatus
|
||||
update.ClientDescription = ar.AllocState().ClientDescription
|
||||
c.AllocStateUpdated(update)
|
||||
}
|
||||
|
||||
// Update local copy of alloc
|
||||
if err := c.stateDB.PutAllocation(update); err != nil {
|
||||
c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID)
|
||||
|
||||
@@ -1713,3 +1713,88 @@ func Test_verifiedTasks(t *testing.T) {
|
||||
try(t, alloc(tgTasks), tasks, tasks, "")
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_ReconnectAllocs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, _, cleanupS1 := testServer(t, nil)
|
||||
defer cleanupS1()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
|
||||
c.DevMode = false
|
||||
c.RPCHandler = s1
|
||||
})
|
||||
defer cleanupC1()
|
||||
|
||||
waitTilNodeReady(c1, t)
|
||||
|
||||
job := mock.Job()
|
||||
|
||||
runningAlloc := mock.Alloc()
|
||||
runningAlloc.NodeID = c1.Node().ID
|
||||
runningAlloc.Job = job
|
||||
runningAlloc.JobID = job.ID
|
||||
runningAlloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
|
||||
runningAlloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
||||
"run_for": "10s",
|
||||
}
|
||||
runningAlloc.ClientStatus = structs.AllocClientStatusPending
|
||||
|
||||
state := s1.State()
|
||||
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = state.UpsertJobSummary(101, mock.JobSummary(runningAlloc.JobID))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{runningAlloc})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Ensure allocation gets upserted with desired status.
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
upsertResult, stateErr := state.AllocByID(nil, runningAlloc.ID)
|
||||
return upsertResult.ClientStatus == structs.AllocClientStatusRunning, stateErr
|
||||
}, func(err error) {
|
||||
require.NoError(t, err, "allocation query failed")
|
||||
})
|
||||
|
||||
// Create the unknown version of the alloc from the running one, update state
|
||||
// to simulate what reconciler would have done, and then send to the client.
|
||||
unknownAlloc, err := state.AllocByID(nil, runningAlloc.ID)
|
||||
require.Equal(t, structs.AllocClientStatusRunning, unknownAlloc.ClientStatus)
|
||||
require.NoError(t, err)
|
||||
unknownAlloc.ClientStatus = structs.AllocClientStatusUnknown
|
||||
err = state.UpsertAllocs(structs.MsgTypeTestSetup, runningAlloc.AllocModifyIndex+1, []*structs.Allocation{unknownAlloc})
|
||||
require.NoError(t, err)
|
||||
|
||||
updates := &allocUpdates{
|
||||
pulled: map[string]*structs.Allocation{
|
||||
unknownAlloc.ID: unknownAlloc,
|
||||
},
|
||||
}
|
||||
|
||||
c1.runAllocs(updates)
|
||||
|
||||
invalid := false
|
||||
var runner AllocRunner
|
||||
var finalAlloc *structs.Allocation
|
||||
// Ensure the allocation is not invalid on the client and has been marked
|
||||
// running on the server with the new modify index
|
||||
testutil.WaitForResult(func() (result bool, stateErr error) {
|
||||
c1.allocLock.RLock()
|
||||
runner = c1.allocs[unknownAlloc.ID]
|
||||
_, invalid = c1.invalidAllocs[unknownAlloc.ID]
|
||||
c1.allocLock.RUnlock()
|
||||
|
||||
finalAlloc, stateErr = state.AllocByID(nil, unknownAlloc.ID)
|
||||
result = structs.AllocClientStatusRunning == finalAlloc.ClientStatus
|
||||
return
|
||||
}, func(err error) {
|
||||
require.NoError(t, err, "allocation server check failed")
|
||||
})
|
||||
|
||||
require.NotNil(t, runner, "expected alloc runner")
|
||||
require.False(t, invalid, "expected alloc to not be marked invalid")
|
||||
require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user