allocrunner: Push state from hooks to taskrunners

This commit is an initial (read: janky) approach to forwarding state
from an allocrunner hook to a taskrunner using a similar `hookResources`
approach that tr's use internally.

It should eventually probably be replaced with something a little bit
more message based, but for things that only come from pre-run hooks,
and don't change, it's probably fine for now.
This commit is contained in:
Danielle Lancashire
2020-02-11 17:39:16 +01:00
committed by Tim Gross
parent 246f210975
commit 7d044a340f
5 changed files with 79 additions and 3 deletions

View File

@@ -120,6 +120,10 @@ type allocRunner struct {
// transistions.
runnerHooks []interfaces.RunnerHook
// hookState is the output of allocrunner hooks
hookState *cstructs.AllocHookResources
hookStateMu sync.RWMutex
// tasks are the set of task runners
tasks map[string]*taskrunner.TaskRunner

View File

@@ -7,11 +7,41 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
type hookResourceSetter interface {
GetAllocHookResources() *cstructs.AllocHookResources
SetAllocHookResources(*cstructs.AllocHookResources)
}
type allocHookResourceSetter struct {
ar *allocRunner
}
func (a *allocHookResourceSetter) GetAllocHookResources() *cstructs.AllocHookResources {
a.ar.hookStateMu.RLock()
defer a.ar.hookStateMu.RUnlock()
return a.ar.hookState
}
func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookResources) {
a.ar.hookStateMu.Lock()
defer a.ar.hookStateMu.Unlock()
a.ar.hookState = res
// Propagate to all of the TRs within the lock to ensure consistent state.
// TODO: Refactor so TR's pull state from AR?
for _, tr := range a.ar.tasks {
tr.SetAllocHookResources(res)
}
}
type networkIsolationSetter interface {
SetNetworkIsolation(*drivers.NetworkIsolationSpec)
}
@@ -105,6 +135,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
// create network isolation setting shim
ns := &allocNetworkIsolationSetter{ar: ar}
// create hook resource setting shim
hrs := &allocHookResourceSetter{ar: ar}
// build the network manager
nm, err := newNetworkManager(ar.Alloc(), ar.driverManager)
if err != nil {
@@ -134,7 +167,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
logger: hookLogger,
}),
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newCSIHook(hookLogger, alloc, ar.rpcClient, ar.csiManager),
newCSIHook(hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs),
}
return nil

View File

@@ -18,6 +18,7 @@ type csiHook struct {
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
}
func (c *csiHook) Name() string {
@@ -50,7 +51,9 @@ func (c *csiHook) Prerun() error {
mounts[alias] = mountInfo
}
// TODO: Propagate mounts back to the tasks.
res := c.updater.GetAllocHookResources()
res.CSIMounts = mounts
c.updater.SetAllocHookResources(res)
return nil
}
@@ -91,12 +94,13 @@ func (c *csiHook) csiVolumesFromAlloc() (map[string]*structs.CSIVolume, error) {
return csiVols, nil
}
func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager) *csiHook {
func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook {
return &csiHook{
alloc: alloc,
logger: logger.Named("csi_hook"),
rpcClient: rpcClient,
csimanager: csi,
updater: updater,
}
}

View File

@@ -220,6 +220,8 @@ type TaskRunner struct {
networkIsolationLock sync.Mutex
networkIsolationSpec *drivers.NetworkIsolationSpec
allocHookResources *cstructs.AllocHookResources
}
type Config struct {
@@ -1408,3 +1410,7 @@ func (tr *TaskRunner) TaskExecHandler() drivermanager.TaskExecHandler {
func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
return tr.driver.Capabilities()
}
func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
tr.allocHookResources = res
}

View File

@@ -0,0 +1,29 @@
package structs
import (
"sync"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
)
// AllocHookResources contains data that is provided by AllocRunner Hooks for
// consumption by TaskRunners
type AllocHookResources struct {
CSIMounts map[string]*csimanager.MountInfo
mu sync.RWMutex
}
func (a *AllocHookResources) GetCSIMounts() map[string]*csimanager.MountInfo {
a.mu.RLock()
defer a.mu.RUnlock()
return a.CSIMounts
}
func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) {
a.mu.Lock()
defer a.mu.Unlock()
a.CSIMounts = m
}