disconnected clients: Add reconnect task event (#12133)

* Add TaskClientReconnectedEvent constant
* Add allocRunner.Reconnect function to manage task state manually
* Removes server-side push
This commit is contained in:
Derek Strickland
2022-03-02 05:47:26 -05:00
committed by DerekStrickland
parent 97ce949f0e
commit 35752655b0
10 changed files with 263 additions and 6 deletions

View File

@@ -971,6 +971,7 @@ const (
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
TaskBuildingTaskDir = "Building Task Directory"
TaskClientReconnected = "Reconnected"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@@ -784,6 +784,17 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
return ar.state.NetworkStatus.Copy()
}
// setIndexes is a helper for forcing a set of server side indexes
// on the alloc runner. This is used during reconnect when the task
// has been marked unknown by the server.
func (ar *allocRunner) setIndexes(update *structs.Allocation) {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
ar.alloc.AllocModifyIndex = update.AllocModifyIndex
ar.alloc.ModifyIndex = update.ModifyIndex
ar.alloc.ModifyTime = update.ModifyTime
}
// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
@@ -1240,6 +1251,43 @@ func (ar *allocRunner) Signal(taskName, signal string) error {
return err.ErrorOrNil()
}
// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server.
func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
event := structs.NewTaskEvent(structs.TaskClientReconnected)
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}
// Update the client alloc with the server client side indexes.
ar.setIndexes(update)
// Calculate alloc state to get the final state with the new events.
// Cannot rely on AllocStates as it won't recompute TaskStates once they are set.
states := make(map[string]*structs.TaskState, len(ar.tasks))
for name, tr := range ar.tasks {
states[name] = tr.TaskState()
}
// Build the client allocation
alloc := ar.clientAlloc(states)
// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {
return
}
// Update the server.
ar.stateUpdater.AllocStateUpdated(alloc)
// Broadcast client alloc to listeners.
err = ar.allocBroadcaster.Send(alloc)
return
}
func (ar *allocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler {
tr, ok := ar.tasks[taskName]
if !ok {

View File

@@ -1,6 +1,7 @@
package allocrunner
import (
"errors"
"fmt"
"io/ioutil"
"os"
@@ -1575,3 +1576,186 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ts)
}
func TestAllocRunner_Reconnect(t *testing.T) {
t.Parallel()
type tcase struct {
clientStatus string
taskState string
taskEvent *structs.TaskEvent
}
tcases := []tcase{
{
structs.AllocClientStatusRunning,
structs.TaskStateRunning,
structs.NewTaskEvent(structs.TaskStarted),
},
{
structs.AllocClientStatusComplete,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskTerminated),
},
{
structs.AllocClientStatusFailed,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskDriverFailure).SetFailsTask(),
},
{
structs.AllocClientStatusPending,
structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskReceived),
},
}
for _, tc := range tcases {
t.Run(tc.clientStatus, func(t *testing.T) {
// create a running alloc
alloc := mock.BatchAlloc()
// Ensure task takes some time
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
for _, taskRunner := range ar.tasks {
taskRunner.UpdateState(tc.taskState, tc.taskEvent)
}
ar.Reconnect()
require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus)
found := false
updater := conf.StateUpdater.(*MockStateUpdater)
var last *structs.Allocation
testutil.WaitForResult(func() (bool, error) {
last = updater.Last()
if last == nil {
return false, errors.New("last update nil")
}
states := last.TaskStates
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
found = true
return true, nil
}
}
}
return false, errors.New("no reconnect event found")
}, func(err error) {
require.NoError(t, err)
})
require.True(t, found, "no reconnect event found")
})
}
}
func TestAllocRunner_MaybeHasPendingReconnect(t *testing.T) {
t.Parallel()
type tcase struct {
name string
timestamp int64
expectedDiff int
}
tcases := []tcase{
{
"should guard now",
time.Now().UnixNano(),
1,
},
{
"should guard 3 seconds",
time.Now().Add(-(3 * time.Second)).UnixNano(),
1,
},
{
"should not guard 6 seconds",
time.Now().Add(-(6 * time.Second)).UnixNano(),
2,
},
}
for _, tc := range tcases {
t.Run(tc.name, func(t *testing.T) {
alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
reconnectEvent := structs.NewTaskEvent(structs.TaskClientReconnected)
reconnectEvent.Time = tc.timestamp
for _, tr := range ar.tasks {
tr.EmitEvent(reconnectEvent)
}
updater := conf.StateUpdater.(*MockStateUpdater)
// get a copy of the first states so that we can compare lengths to
// determine how many events were appended.
var firstStates map[string]*structs.TaskState
testutil.WaitForResult(func() (bool, error) {
last := updater.Last()
if last == nil {
return false, errors.New("last update nil")
}
states := last.TaskStates
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
firstStates = states
return true, nil
}
}
}
return false, errors.New("no reconnect event found")
}, func(err error) {
require.NoError(t, err)
})
ar.Reconnect()
testutil.WaitForResult(func() (bool, error) {
last := updater.Last()
if last == nil {
return false, errors.New("last update nil")
}
for k, taskState := range last.TaskStates {
if len(taskState.Events) != len(firstStates[k].Events)+tc.expectedDiff {
return false, fmt.Errorf("expected %d reconnect events", tc.expectedDiff)
}
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})
})
}
}

View File

@@ -36,6 +36,11 @@ func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
m.mu.Unlock()
}
// PutAllocation satisfies the AllocStateHandler interface.
func (m *MockStateUpdater) PutAllocation(alloc *structs.Allocation) (err error) {
return
}
// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
// access with updates.
func (m *MockStateUpdater) Last() *structs.Allocation {

View File

@@ -158,6 +158,7 @@ type AllocRunner interface {
RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
Reconnect(update *structs.Allocation) error
GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler
GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error)
@@ -1978,6 +1979,11 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
}
}
// PutAllocation stores an allocation or returns an error if it could not be stored.
func (c *Client) PutAllocation(alloc *structs.Allocation) error {
return c.stateDB.PutAllocation(alloc)
}
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
@@ -2422,9 +2428,11 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
// Reconnect unknown allocations
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
update.ClientStatus = ar.AllocState().ClientStatus
update.ClientDescription = ar.AllocState().ClientDescription
c.AllocStateUpdated(update)
err = ar.Reconnect(update)
if err != nil {
c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "err", err)
}
return
}
// Update local copy of alloc

View File

@@ -14,6 +14,9 @@ type AllocStateHandler interface {
// AllocStateUpdated is used to emit an updated allocation. This allocation
// is stripped to only include client settable fields.
AllocStateUpdated(alloc *structs.Allocation)
// PutAllocation is used to persist an updated allocation in the local state store.
PutAllocation(*structs.Allocation) error
}
// DeviceStatsReporter gives access to the latest resource usage

View File

@@ -538,6 +538,8 @@ func buildDisplayMessage(event *api.TaskEvent) string {
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskClientReconnected:
desc = "Client reconnected"
default:
desc = event.Message
}

View File

@@ -7978,6 +7978,9 @@ const (
// TaskPluginHealthy indicates that a plugin managed by Nomad became healthy
TaskPluginHealthy = "Plugin became healthy"
// TaskClientReconnected indicates that the client running the task disconnected.
TaskClientReconnected = "Reconnected"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@@ -8189,6 +8192,8 @@ func (e *TaskEvent) PopulateEventDisplayMessage() {
desc = "Leader Task in Group dead"
case TaskMainDead:
desc = "Main tasks in the group died"
case TaskClientReconnected:
desc = "Client reconnected"
default:
desc = e.Message
}

View File

@@ -5961,7 +5961,8 @@ func TestTaskEventPopulate(t *testing.T) {
{NewTaskEvent(TaskSignaling).SetTaskSignal(os.Interrupt).SetTaskSignalReason("process interrupted"), "Task being sent signal interrupt: process interrupted"},
{NewTaskEvent(TaskRestartSignal), "Task signaled to restart"},
{NewTaskEvent(TaskRestartSignal).SetRestartReason("Chaos Monkey restarted it"), "Chaos Monkey restarted it"},
{NewTaskEvent(TaskDriverMessage).SetDriverMessage("YOLO"), "YOLO"},
{NewTaskEvent(TaskClientReconnected), "Client reconnected"},
{NewTaskEvent(TaskLeaderDead), "Leader Task in Group dead"},
{NewTaskEvent("Unknown Type, No message"), ""},
{NewTaskEvent("Unknown Type").SetMessage("Hello world"), "Hello world"},
}

View File

@@ -419,9 +419,9 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}
// Handle reconnect updates
// Log reconnect updates. They will be pulled by the client when it reconnects.
for _, update := range results.reconnectUpdates {
s.ctx.Plan().AppendAlloc(update, nil)
s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
}
// Nothing remaining to do if placement is not required