mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
implement alloc runner task restart hook
Most allocation hooks don't need to know when a single task within the allocation is restarted. The check watcher for group services triggers the alloc runner to restart all tasks, but the alloc runner's `Restart` method doesn't trigger any of the alloc hooks, including the group service hook. The result is that after the first time a check triggers a restart, we'll never restart the tasks of an allocation again. This commit adds a `RunnerTaskRestartHook` interface so that alloc runner hooks can act if a task within the alloc is restarted. The only implementation is in the group service hook, which will force a re-registration of the alloc's services and fix check restarts.
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
BUG FIXES:
|
||||
|
||||
* consul: Fixed a bug where failing tasks with group services would only cause the allocation to restart once instead of respecting the `restart` field. [[GH-9869](https://github.com/hashicorp/nomad/issues/9869)]
|
||||
* consul/connect: Fixed a bug where gateway proxy connection default timeout not set [[GH-9851](https://github.com/hashicorp/nomad/pull/9851)]
|
||||
* consul/connect: Fixed a bug preventing more than one connect gateway per Nomad client [[GH-9849](https://github.com/hashicorp/nomad/pull/9849)]
|
||||
* scheduler: Fixed a bug where shared ports were not persisted during inplace updates for service jobs. [[GH-9830](https://github.com/hashicorp/nomad/issues/9830)]
|
||||
@@ -17,7 +18,7 @@ IMPROVEMENTS:
|
||||
* consul/connect: Interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)]
|
||||
* consul/connect: enable configuring custom gateway task [[GH-9639](https://github.com/hashicorp/nomad/pull/9639)]
|
||||
* cli: Added JSON/go template formatting to agent-info command. [[GH-9788](https://github.com/hashicorp/nomad/pull/9788)]
|
||||
|
||||
|
||||
|
||||
BUG FIXES:
|
||||
* client: Fixed a bug where non-`docker` tasks with network isolation were restarted on client restart. [[GH-9757](https://github.com/hashicorp/nomad/issues/9757)]
|
||||
|
||||
@@ -1138,6 +1138,9 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
|
||||
var err *multierror.Error
|
||||
var errMutex sync.Mutex
|
||||
|
||||
// run alloc task restart hooks
|
||||
ar.taskRestartHooks()
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
defer close(waitCh)
|
||||
@@ -1170,6 +1173,9 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
|
||||
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
|
||||
var err *multierror.Error
|
||||
|
||||
// run alloc task restart hooks
|
||||
ar.taskRestartHooks()
|
||||
|
||||
for tn := range ar.tasks {
|
||||
rerr := ar.RestartTask(tn, taskEvent.Copy())
|
||||
if rerr != nil {
|
||||
|
||||
@@ -360,3 +360,28 @@ func (ar *allocRunner) shutdownHooks() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ar *allocRunner) taskRestartHooks() {
|
||||
for _, hook := range ar.runnerHooks {
|
||||
re, ok := hook.(interfaces.RunnerTaskRestartHook)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
name := re.Name()
|
||||
var start time.Time
|
||||
if ar.logger.IsTrace() {
|
||||
start = time.Now()
|
||||
ar.logger.Trace("running alloc task restart hook",
|
||||
"name", name, "start", start)
|
||||
}
|
||||
|
||||
re.PreTaskRestart()
|
||||
|
||||
if ar.logger.IsTrace() {
|
||||
end := time.Now()
|
||||
ar.logger.Trace("finished alloc task restart hook",
|
||||
"name", name, "end", end, "duration", end.Sub(start))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,7 +93,10 @@ func (h *groupServiceHook) Prerun() error {
|
||||
h.prerun = true
|
||||
h.mu.Unlock()
|
||||
}()
|
||||
return h.prerunLocked()
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) prerunLocked() error {
|
||||
if len(h.services) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -145,10 +148,26 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
||||
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) PreTaskRestart() error {
|
||||
h.mu.Lock()
|
||||
defer func() {
|
||||
// Mark prerun as true to unblock Updates
|
||||
h.prerun = true
|
||||
h.mu.Unlock()
|
||||
}()
|
||||
|
||||
h.preKillLocked()
|
||||
return h.prerunLocked()
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) PreKill() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.preKillLocked()
|
||||
}
|
||||
|
||||
// implements the PreKill hook but requires the caller hold the lock
|
||||
func (h *groupServiceHook) preKillLocked() {
|
||||
// If we have a shutdown delay deregister
|
||||
// group services and then wait
|
||||
// before continuing to kill tasks
|
||||
|
||||
@@ -22,6 +22,7 @@ var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
|
||||
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
|
||||
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
|
||||
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)
|
||||
var _ interfaces.RunnerTaskRestartHook = (*groupServiceHook)(nil)
|
||||
|
||||
// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
|
||||
// without group services does not error.
|
||||
@@ -50,11 +51,17 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
|
||||
ops := consulClient.GetOps()
|
||||
require.Len(t, ops, 4)
|
||||
require.Equal(t, "add", ops[0].Op)
|
||||
require.Equal(t, "update", ops[1].Op)
|
||||
require.Equal(t, "remove", ops[2].Op)
|
||||
require.Len(t, ops, 7)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
|
||||
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
|
||||
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
|
||||
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
|
||||
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks
|
||||
@@ -117,15 +124,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
|
||||
ops := consulClient.GetOps()
|
||||
require.Len(t, ops, 4)
|
||||
require.Equal(t, "add", ops[0].Op)
|
||||
require.Equal(t, "update", ops[1].Op)
|
||||
require.Equal(t, "remove", ops[2].Op)
|
||||
require.Len(t, ops, 7)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
|
||||
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
|
||||
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
|
||||
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
|
||||
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_Error asserts group service hooks with group
|
||||
// services but no group network returns an error.
|
||||
// services but no group network is handled gracefully.
|
||||
func TestGroupServiceHook_NoNetwork(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -159,11 +172,17 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
|
||||
ops := consulClient.GetOps()
|
||||
require.Len(t, ops, 4)
|
||||
require.Equal(t, "add", ops[0].Op)
|
||||
require.Equal(t, "update", ops[1].Op)
|
||||
require.Equal(t, "remove", ops[2].Op)
|
||||
require.Len(t, ops, 7)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
|
||||
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
|
||||
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
|
||||
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
|
||||
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
|
||||
}
|
||||
|
||||
func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
|
||||
|
||||
@@ -55,6 +55,14 @@ type RunnerUpdateRequest struct {
|
||||
Alloc *structs.Allocation
|
||||
}
|
||||
|
||||
// RunnerTaskRestartHooks are executed just before the allocation runner is
|
||||
// going to restart all tasks.
|
||||
type RunnerTaskRestartHook interface {
|
||||
RunnerHook
|
||||
|
||||
PreTaskRestart() error
|
||||
}
|
||||
|
||||
// ShutdownHook may be implemented by AllocRunner or TaskRunner hooks and will
|
||||
// be called when the agent process is being shutdown gracefully.
|
||||
type ShutdownHook interface {
|
||||
|
||||
Reference in New Issue
Block a user