From 7d044a340f67d346a0203e92de56d3a8347e7ea5 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Tue, 11 Feb 2020 17:39:16 +0100 Subject: [PATCH] allocrunner: Push state from hooks to taskrunners This commit is an initial (read: janky) approach to forwarding state from an allocrunner hook to a taskrunner using a similar `hookResources` approach that tr's use internally. It should eventually probably be replaced with something a little bit more message based, but for things that only come from pre-run hooks, and don't change, it's probably fine for now. --- client/allocrunner/alloc_runner.go | 4 +++ client/allocrunner/alloc_runner_hooks.go | 35 +++++++++++++++++++- client/allocrunner/csi_hook.go | 8 +++-- client/allocrunner/taskrunner/task_runner.go | 6 ++++ client/structs/allochook.go | 29 ++++++++++++++++ 5 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 client/structs/allochook.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index cd43318c1..364f7b884 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -120,6 +120,10 @@ type allocRunner struct { // transistions. runnerHooks []interfaces.RunnerHook + // hookState is the output of allocrunner hooks + hookState *cstructs.AllocHookResources + hookStateMu sync.RWMutex + // tasks are the set of task runners tasks map[string]*taskrunner.TaskRunner diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 06f9d381f..73c5f35eb 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -7,11 +7,41 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocrunner/interfaces" clientconfig "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" ) +type hookResourceSetter interface { + GetAllocHookResources() *cstructs.AllocHookResources + SetAllocHookResources(*cstructs.AllocHookResources) +} + +type allocHookResourceSetter struct { + ar *allocRunner +} + +func (a *allocHookResourceSetter) GetAllocHookResources() *cstructs.AllocHookResources { + a.ar.hookStateMu.RLock() + defer a.ar.hookStateMu.RUnlock() + + return a.ar.hookState +} + +func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookResources) { + a.ar.hookStateMu.Lock() + defer a.ar.hookStateMu.Unlock() + + a.ar.hookState = res + + // Propagate to all of the TRs within the lock to ensure consistent state. + // TODO: Refactor so TR's pull state from AR? + for _, tr := range a.ar.tasks { + tr.SetAllocHookResources(res) + } +} + type networkIsolationSetter interface { SetNetworkIsolation(*drivers.NetworkIsolationSpec) } @@ -105,6 +135,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { // create network isolation setting shim ns := &allocNetworkIsolationSetter{ar: ar} + // create hook resource setting shim + hrs := &allocHookResourceSetter{ar: ar} + // build the network manager nm, err := newNetworkManager(ar.Alloc(), ar.driverManager) if err != nil { @@ -134,7 +167,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { logger: hookLogger, }), newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), - newCSIHook(hookLogger, alloc, ar.rpcClient, ar.csiManager), + newCSIHook(hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs), } return nil diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index de4f46da5..aa281d2cc 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -18,6 +18,7 @@ type csiHook struct { logger hclog.Logger csimanager csimanager.Manager rpcClient RPCer + updater hookResourceSetter } func (c *csiHook) Name() string { @@ -50,7 +51,9 @@ func (c *csiHook) Prerun() error { mounts[alias] = mountInfo } - // TODO: Propagate mounts back to the tasks. + res := c.updater.GetAllocHookResources() + res.CSIMounts = mounts + c.updater.SetAllocHookResources(res) return nil } @@ -91,12 +94,13 @@ func (c *csiHook) csiVolumesFromAlloc() (map[string]*structs.CSIVolume, error) { return csiVols, nil } -func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager) *csiHook { +func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook { return &csiHook{ alloc: alloc, logger: logger.Named("csi_hook"), rpcClient: rpcClient, csimanager: csi, + updater: updater, } } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index d33d740f3..e8a054e4c 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -220,6 +220,8 @@ type TaskRunner struct { networkIsolationLock sync.Mutex networkIsolationSpec *drivers.NetworkIsolationSpec + + allocHookResources *cstructs.AllocHookResources } type Config struct { @@ -1408,3 +1410,7 @@ func (tr *TaskRunner) TaskExecHandler() drivermanager.TaskExecHandler { func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) { return tr.driver.Capabilities() } + +func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { + tr.allocHookResources = res +} diff --git a/client/structs/allochook.go b/client/structs/allochook.go new file mode 100644 index 000000000..59c56c0f7 --- /dev/null +++ b/client/structs/allochook.go @@ -0,0 +1,29 @@ +package structs + +import ( + "sync" + + "github.com/hashicorp/nomad/client/pluginmanager/csimanager" +) + +// AllocHookResources contains data that is provided by AllocRunner Hooks for +// consumption by TaskRunners +type AllocHookResources struct { + CSIMounts map[string]*csimanager.MountInfo + + mu sync.RWMutex +} + +func (a *AllocHookResources) GetCSIMounts() map[string]*csimanager.MountInfo { + a.mu.RLock() + defer a.mu.RUnlock() + + return a.CSIMounts +} + +func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) { + a.mu.Lock() + defer a.mu.Unlock() + + a.CSIMounts = m +}