diff --git a/api/tasks.go b/api/tasks.go index 5b58caf6a..82986085b 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -971,6 +971,7 @@ const ( TaskRestartSignal = "Restart Signaled" TaskLeaderDead = "Leader Task Dead" TaskBuildingTaskDir = "Building Task Directory" + TaskClientReconnected = "Reconnected" ) // TaskEvent is an event that effects the state of a task and contains meta-data diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index fa01c618b..75d0250ad 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -784,6 +784,17 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus { return ar.state.NetworkStatus.Copy() } +// setIndexes is a helper for forcing a set of server side indexes +// on the alloc runner. This is used during reconnect when the task +// has been marked unknown by the server. +func (ar *allocRunner) setIndexes(update *structs.Allocation) { + ar.allocLock.Lock() + defer ar.allocLock.Unlock() + ar.alloc.AllocModifyIndex = update.AllocModifyIndex + ar.alloc.ModifyIndex = update.ModifyIndex + ar.alloc.ModifyTime = update.ModifyTime +} + // AllocState returns a copy of allocation state including a snapshot of task // states. func (ar *allocRunner) AllocState() *state.State { @@ -1240,6 +1251,43 @@ func (ar *allocRunner) Signal(taskName, signal string) error { return err.ErrorOrNil() } +// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server. +func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) { + ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex) + + event := structs.NewTaskEvent(structs.TaskClientReconnected) + for _, tr := range ar.tasks { + tr.AppendEvent(event) + } + + // Update the client alloc with the server client side indexes. + ar.setIndexes(update) + + // Calculate alloc state to get the final state with the new events. + // Cannot rely on AllocStates as it won't recompute TaskStates once they are set. + states := make(map[string]*structs.TaskState, len(ar.tasks)) + for name, tr := range ar.tasks { + states[name] = tr.TaskState() + } + + // Build the client allocation + alloc := ar.clientAlloc(states) + + // Update the client state store. + err = ar.stateUpdater.PutAllocation(alloc) + if err != nil { + return + } + + // Update the server. + ar.stateUpdater.AllocStateUpdated(alloc) + + // Broadcast client alloc to listeners. + err = ar.allocBroadcaster.Send(alloc) + + return +} + func (ar *allocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler { tr, ok := ar.tasks[taskName] if !ok { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index adf055b61..829ae236f 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1,6 +1,7 @@ package allocrunner import ( + "errors" "fmt" "io/ioutil" "os" @@ -1575,3 +1576,186 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) { require.NoError(t, err) require.Nil(t, ts) } + +func TestAllocRunner_Reconnect(t *testing.T) { + t.Parallel() + + type tcase struct { + clientStatus string + taskState string + taskEvent *structs.TaskEvent + } + tcases := []tcase{ + { + structs.AllocClientStatusRunning, + structs.TaskStateRunning, + structs.NewTaskEvent(structs.TaskStarted), + }, + { + structs.AllocClientStatusComplete, + structs.TaskStateDead, + structs.NewTaskEvent(structs.TaskTerminated), + }, + { + structs.AllocClientStatusFailed, + structs.TaskStateDead, + structs.NewTaskEvent(structs.TaskDriverFailure).SetFailsTask(), + }, + { + structs.AllocClientStatusPending, + structs.TaskStatePending, + structs.NewTaskEvent(structs.TaskReceived), + }, + } + + for _, tc := range tcases { + t.Run(tc.clientStatus, func(t *testing.T) { + // create a running alloc + alloc := mock.BatchAlloc() + + // Ensure task takes some time + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "30s" + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + + go ar.Run() + + for _, taskRunner := range ar.tasks { + taskRunner.UpdateState(tc.taskState, tc.taskEvent) + } + + ar.Reconnect() + + require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus) + + found := false + + updater := conf.StateUpdater.(*MockStateUpdater) + var last *structs.Allocation + testutil.WaitForResult(func() (bool, error) { + last = updater.Last() + if last == nil { + return false, errors.New("last update nil") + } + + states := last.TaskStates + for _, s := range states { + for _, e := range s.Events { + if e.Type == structs.TaskClientReconnected { + found = true + return true, nil + } + } + } + + return false, errors.New("no reconnect event found") + }, func(err error) { + require.NoError(t, err) + }) + + require.True(t, found, "no reconnect event found") + }) + } +} + +func TestAllocRunner_MaybeHasPendingReconnect(t *testing.T) { + t.Parallel() + + type tcase struct { + name string + timestamp int64 + expectedDiff int + } + tcases := []tcase{ + { + "should guard now", + time.Now().UnixNano(), + 1, + }, + { + "should guard 3 seconds", + time.Now().Add(-(3 * time.Second)).UnixNano(), + 1, + }, + { + "should not guard 6 seconds", + time.Now().Add(-(6 * time.Second)).UnixNano(), + 2, + }, + } + + for _, tc := range tcases { + t.Run(tc.name, func(t *testing.T) { + alloc := mock.BatchAlloc() + + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "30s" + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + + go ar.Run() + + reconnectEvent := structs.NewTaskEvent(structs.TaskClientReconnected) + reconnectEvent.Time = tc.timestamp + for _, tr := range ar.tasks { + tr.EmitEvent(reconnectEvent) + } + + updater := conf.StateUpdater.(*MockStateUpdater) + // get a copy of the first states so that we can compare lengths to + // determine how many events were appended. + var firstStates map[string]*structs.TaskState + testutil.WaitForResult(func() (bool, error) { + last := updater.Last() + if last == nil { + return false, errors.New("last update nil") + } + states := last.TaskStates + for _, s := range states { + for _, e := range s.Events { + if e.Type == structs.TaskClientReconnected { + firstStates = states + return true, nil + } + } + } + + return false, errors.New("no reconnect event found") + }, func(err error) { + require.NoError(t, err) + }) + + ar.Reconnect() + + testutil.WaitForResult(func() (bool, error) { + last := updater.Last() + if last == nil { + return false, errors.New("last update nil") + } + + for k, taskState := range last.TaskStates { + if len(taskState.Events) != len(firstStates[k].Events)+tc.expectedDiff { + return false, fmt.Errorf("expected %d reconnect events", tc.expectedDiff) + } + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + }) + } +} diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index f4c5bc673..c369c4fe1 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -36,6 +36,11 @@ func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) { m.mu.Unlock() } +// PutAllocation satisfies the AllocStateHandler interface. +func (m *MockStateUpdater) PutAllocation(alloc *structs.Allocation) (err error) { + return +} + // Last returns a copy of the last alloc (or nil) update. Safe for concurrent // access with updates. func (m *MockStateUpdater) Last() *structs.Allocation { diff --git a/client/client.go b/client/client.go index 6f3077b29..978abe563 100644 --- a/client/client.go +++ b/client/client.go @@ -158,6 +158,7 @@ type AllocRunner interface { RestartTask(taskName string, taskEvent *structs.TaskEvent) error RestartAll(taskEvent *structs.TaskEvent) error + Reconnect(update *structs.Allocation) error GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) @@ -1978,6 +1979,11 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { } } +// PutAllocation stores an allocation or returns an error if it could not be stored. +func (c *Client) PutAllocation(alloc *structs.Allocation) error { + return c.stateDB.PutAllocation(alloc) +} + // allocSync is a long lived function that batches allocation updates to the // server. func (c *Client) allocSync() { @@ -2422,9 +2428,11 @@ func (c *Client) updateAlloc(update *structs.Allocation) { // Reconnect unknown allocations if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex { - update.ClientStatus = ar.AllocState().ClientStatus - update.ClientDescription = ar.AllocState().ClientDescription - c.AllocStateUpdated(update) + err = ar.Reconnect(update) + if err != nil { + c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "err", err) + } + return } // Update local copy of alloc diff --git a/client/interfaces/client.go b/client/interfaces/client.go index 9715b4f01..35f28c321 100644 --- a/client/interfaces/client.go +++ b/client/interfaces/client.go @@ -14,6 +14,9 @@ type AllocStateHandler interface { // AllocStateUpdated is used to emit an updated allocation. This allocation // is stripped to only include client settable fields. AllocStateUpdated(alloc *structs.Allocation) + + // PutAllocation is used to persist an updated allocation in the local state store. + PutAllocation(*structs.Allocation) error } // DeviceStatsReporter gives access to the latest resource usage diff --git a/command/alloc_status.go b/command/alloc_status.go index edb649c0a..0d8b413c0 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -538,6 +538,8 @@ func buildDisplayMessage(event *api.TaskEvent) string { desc = event.DriverMessage case api.TaskLeaderDead: desc = "Leader Task in Group dead" + case api.TaskClientReconnected: + desc = "Client reconnected" default: desc = event.Message } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a7413cd5c..136125462 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7978,6 +7978,9 @@ const ( // TaskPluginHealthy indicates that a plugin managed by Nomad became healthy TaskPluginHealthy = "Plugin became healthy" + + // TaskClientReconnected indicates that the client running the task disconnected. + TaskClientReconnected = "Reconnected" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -8189,6 +8192,8 @@ func (e *TaskEvent) PopulateEventDisplayMessage() { desc = "Leader Task in Group dead" case TaskMainDead: desc = "Main tasks in the group died" + case TaskClientReconnected: + desc = "Client reconnected" default: desc = e.Message } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index f04b8b9a8..39c288b9b 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5961,7 +5961,8 @@ func TestTaskEventPopulate(t *testing.T) { {NewTaskEvent(TaskSignaling).SetTaskSignal(os.Interrupt).SetTaskSignalReason("process interrupted"), "Task being sent signal interrupt: process interrupted"}, {NewTaskEvent(TaskRestartSignal), "Task signaled to restart"}, {NewTaskEvent(TaskRestartSignal).SetRestartReason("Chaos Monkey restarted it"), "Chaos Monkey restarted it"}, - {NewTaskEvent(TaskDriverMessage).SetDriverMessage("YOLO"), "YOLO"}, + {NewTaskEvent(TaskClientReconnected), "Client reconnected"}, + {NewTaskEvent(TaskLeaderDead), "Leader Task in Group dead"}, {NewTaskEvent("Unknown Type, No message"), ""}, {NewTaskEvent("Unknown Type").SetMessage("Hello world"), "Hello world"}, } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e9489739f..5b10cb315 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -419,9 +419,9 @@ func (s *GenericScheduler) computeJobAllocs() error { s.ctx.Plan().AppendAlloc(update, nil) } - // Handle reconnect updates + // Log reconnect updates. They will be pulled by the client when it reconnects. for _, update := range results.reconnectUpdates { - s.ctx.Plan().AppendAlloc(update, nil) + s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex) } // Nothing remaining to do if placement is not required