mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
There are bits of logic in callers of RemoveWorkload on group/task cleanup hooks which call RemoveWorkload with the "Canary" version of the workload, in case the alloc is marked as a Canary. This logic triggers an extra sync with Consul, and also doesn't do the intended behavior - for which no special casing is necessary anyway. When the workload is marked for removal, all associated services and checks will be removed regardless of the Canary status, because the service and check IDs do not incorporate the canary-ness in the first place. The only place where canary-ness matters is when updating a workload, where we need to compute the hash of the services and checks to determine whether they have been modified, the Canary flag of which is a part of that. Fixes #10842
206 lines
5.7 KiB
Go
206 lines
5.7 KiB
Go
package taskrunner
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
|
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
|
"github.com/hashicorp/nomad/client/consul"
|
|
"github.com/hashicorp/nomad/client/taskenv"
|
|
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
)
|
|
|
|
var _ interfaces.TaskPoststartHook = &serviceHook{}
|
|
var _ interfaces.TaskPreKillHook = &serviceHook{}
|
|
var _ interfaces.TaskExitedHook = &serviceHook{}
|
|
var _ interfaces.TaskStopHook = &serviceHook{}
|
|
var _ interfaces.TaskUpdateHook = &serviceHook{}
|
|
|
|
type serviceHookConfig struct {
|
|
alloc *structs.Allocation
|
|
task *structs.Task
|
|
consulServices consul.ConsulServiceAPI
|
|
consulNamespace string
|
|
|
|
// Restarter is a subset of the TaskLifecycle interface
|
|
restarter agentconsul.WorkloadRestarter
|
|
|
|
logger log.Logger
|
|
}
|
|
|
|
type serviceHook struct {
|
|
allocID string
|
|
taskName string
|
|
consulNamespace string
|
|
consulServices consul.ConsulServiceAPI
|
|
restarter agentconsul.WorkloadRestarter
|
|
logger log.Logger
|
|
|
|
// The following fields may be updated
|
|
driverExec tinterfaces.ScriptExecutor
|
|
driverNet *drivers.DriverNetwork
|
|
canary bool
|
|
services []*structs.Service
|
|
networks structs.Networks
|
|
ports structs.AllocatedPorts
|
|
taskEnv *taskenv.TaskEnv
|
|
|
|
// initialRegistrations tracks if Poststart has completed, initializing
|
|
// fields required in other lifecycle funcs
|
|
initialRegistration bool
|
|
|
|
// Since Update() may be called concurrently with any other hook all
|
|
// hook methods must be fully serialized
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func newServiceHook(c serviceHookConfig) *serviceHook {
|
|
h := &serviceHook{
|
|
allocID: c.alloc.ID,
|
|
taskName: c.task.Name,
|
|
consulServices: c.consulServices,
|
|
consulNamespace: c.consulNamespace,
|
|
services: c.task.Services,
|
|
restarter: c.restarter,
|
|
ports: c.alloc.AllocatedResources.Shared.Ports,
|
|
}
|
|
|
|
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
|
|
h.networks = res.Networks
|
|
}
|
|
|
|
if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
|
|
h.canary = true
|
|
}
|
|
|
|
h.logger = c.logger.Named(h.Name())
|
|
return h
|
|
}
|
|
|
|
func (h *serviceHook) Name() string {
|
|
return "consul_services"
|
|
}
|
|
|
|
func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
// Store the TaskEnv for interpolating now and when Updating
|
|
h.driverExec = req.DriverExec
|
|
h.driverNet = req.DriverNetwork
|
|
h.taskEnv = req.TaskEnv
|
|
h.initialRegistration = true
|
|
|
|
// Create task services struct with request's driver metadata
|
|
workloadServices := h.getWorkloadServices()
|
|
|
|
return h.consulServices.RegisterWorkload(workloadServices)
|
|
}
|
|
|
|
func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if !h.initialRegistration {
|
|
// no op Consul since initial registration has not finished
|
|
// only update hook fields
|
|
return h.updateHookFields(req)
|
|
}
|
|
|
|
// Create old task services struct with request's driver metadata as it
|
|
// can't change due to Updates
|
|
oldWorkloadServices := h.getWorkloadServices()
|
|
|
|
if err := h.updateHookFields(req); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create new task services struct with those new values
|
|
newWorkloadServices := h.getWorkloadServices()
|
|
|
|
return h.consulServices.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
|
|
}
|
|
|
|
func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error {
|
|
// Store new updated values out of request
|
|
canary := false
|
|
if req.Alloc.DeploymentStatus != nil {
|
|
canary = req.Alloc.DeploymentStatus.Canary
|
|
}
|
|
|
|
var networks structs.Networks
|
|
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
|
|
networks = res.Networks
|
|
}
|
|
|
|
task := req.Alloc.LookupTask(h.taskName)
|
|
if task == nil {
|
|
return fmt.Errorf("task %q not found in updated alloc", h.taskName)
|
|
}
|
|
|
|
// Update service hook fields
|
|
h.taskEnv = req.TaskEnv
|
|
h.services = task.Services
|
|
h.networks = networks
|
|
h.canary = canary
|
|
h.ports = req.Alloc.AllocatedResources.Shared.Ports
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *serviceHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
// Deregister before killing task
|
|
h.deregister()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.deregister()
|
|
return nil
|
|
}
|
|
|
|
// deregister services from Consul.
|
|
func (h *serviceHook) deregister() {
|
|
if len(h.services) > 0 {
|
|
workloadServices := h.getWorkloadServices()
|
|
h.consulServices.RemoveWorkload(workloadServices)
|
|
}
|
|
h.initialRegistration = false
|
|
}
|
|
|
|
func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.deregister()
|
|
return nil
|
|
}
|
|
|
|
func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
|
|
// Interpolate with the task's environment
|
|
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
|
|
|
|
// Create task services struct with request's driver metadata
|
|
return &agentconsul.WorkloadServices{
|
|
AllocID: h.allocID,
|
|
Task: h.taskName,
|
|
ConsulNamespace: h.consulNamespace,
|
|
Restarter: h.restarter,
|
|
Services: interpolatedServices,
|
|
DriverExec: h.driverExec,
|
|
DriverNetwork: h.driverNet,
|
|
Networks: h.networks,
|
|
Canary: h.canary,
|
|
Ports: h.ports,
|
|
}
|
|
}
|