From 8514893d1dca53442b37eca58f6ce96f0c43470d Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Jul 2019 16:44:08 +0200 Subject: [PATCH] structs: Add declarations of basic structs for volume support --- api/tasks.go | 17 +++ nomad/structs/structs.go | 60 +++++++++++ nomad/structs/structs_test.go | 77 ++++++++++++++ nomad/structs/volumes.go | 188 ++++++++++++++++++++++++++++++++++ 4 files changed, 342 insertions(+) create mode 100644 nomad/structs/volumes.go diff --git a/api/tasks.go b/api/tasks.go index f12f761e7..e044f9a9f 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -362,6 +362,21 @@ func (m *MigrateStrategy) Copy() *MigrateStrategy { return nm } +type Volume struct { + Name string + Type string + ReadOnly bool `mapstructure:"read_only"` + Hidden bool + + Config map[string]interface{} +} + +type VolumeMount struct { + Volume string + Destination string + ReadOnly bool `mapstructure:"read_only"` +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name *string @@ -370,6 +385,7 @@ type TaskGroup struct { Affinities []*Affinity Tasks []*Task Spreads []*Spread + Volumes map[string]*Volume RestartPolicy *RestartPolicy ReschedulePolicy *ReschedulePolicy EphemeralDisk *EphemeralDisk @@ -580,6 +596,7 @@ type Task struct { Vault *Vault Templates []*Template DispatchPayload *DispatchPayloadConfig + VolumeMounts []*VolumeMount Leader bool ShutdownDelay time.Duration `mapstructure:"shutdown_delay"` KillSignal string `mapstructure:"kill_signal"` diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c663703aa..5deda2261 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1575,6 +1575,9 @@ type Node struct { // Drivers is a map of driver names to current driver information Drivers map[string]*DriverInfo + // HostVolumes is a map of host volume names to their configuration + HostVolumes map[string]*ClientHostVolumeConfig + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1619,6 +1622,7 @@ func (n *Node) Copy() *Node { nn.Events = copyNodeEvents(n.Events) nn.DrainStrategy = nn.DrainStrategy.Copy() nn.Drivers = copyNodeDrivers(n.Drivers) + nn.HostVolumes = copyNodeHostVolumes(n.HostVolumes) return nn } @@ -1650,6 +1654,21 @@ func copyNodeDrivers(drivers map[string]*DriverInfo) map[string]*DriverInfo { return c } +// copyNodeHostVolumes is a helper to copy a map of string to Volume +func copyNodeHostVolumes(volumes map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig { + l := len(volumes) + if l == 0 { + return nil + } + + c := make(map[string]*ClientHostVolumeConfig, l) + for volume, v := range volumes { + c[volume] = v.Copy() + } + + return c +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { @@ -4659,6 +4678,9 @@ type TaskGroup struct { // Services this group provides Services []*Service + + // Volumes is a map of volumes that have been requested by the task group. + Volumes map[string]*VolumeRequest } func (tg *TaskGroup) Copy() *TaskGroup { @@ -4673,6 +4695,7 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy() ntg.Affinities = CopySliceAffinities(ntg.Affinities) ntg.Spreads = CopySliceSpreads(ntg.Spreads) + ntg.Volumes = CopyMapVolumeRequest(ntg.Volumes) // Copy the network objects if tg.Networks != nil { @@ -4860,6 +4883,25 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader")) } + // Validate the Host Volumes + for name, decl := range tg.Volumes { + if decl.Volume.Type != VolumeTypeHost { + // TODO: Remove this error when adding new volume types + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Volume.Type)) + continue + } + + cfg, err := ParseHostVolumeConfig(decl.Config) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unparseable config: %v", name, err)) + continue + } + + if cfg.Source == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name)) + } + } + // Validate task group and task network resources if err := tg.validateNetworks(); err != nil { outer := fmt.Errorf("Task group network validation failed: %v", err) @@ -4868,6 +4910,19 @@ func (tg *TaskGroup) Validate(j *Job) error { // Validate the tasks for _, task := range tg.Tasks { + // Validate the task does not reference undefined volume mounts + for i, mnt := range task.VolumeMounts { + if mnt.Volume == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a volume mount (%d) referencing an empty volume", task.Name, i)) + continue + } + + if _, ok := tg.Volumes[mnt.Volume]; !ok { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a volume mount (%d) referencing undefined volume %s", task.Name, i, mnt.Volume)) + continue + } + } + if err := task.Validate(tg.EphemeralDisk, j.Type); err != nil { outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err) mErr.Errors = append(mErr.Errors, outer) @@ -5137,6 +5192,10 @@ type Task struct { // task from Consul and sending it a signal to shutdown. See #2441 ShutdownDelay time.Duration + // VolumeMounts is a list of Volume name <-> mount configurations that will be + // attached to this task. + VolumeMounts []*VolumeMount + // The kill signal to use for the task. This is an optional specification, // KillSignal is the kill signal to use for the task. This is an optional @@ -5166,6 +5225,7 @@ func (t *Task) Copy() *Task { nt.Constraints = CopySliceConstraints(nt.Constraints) nt.Affinities = CopySliceAffinities(nt.Affinities) + nt.VolumeMounts = CopySliceVolumeMount(nt.VolumeMounts) nt.Vault = nt.Vault.Copy() nt.Resources = nt.Resources.Copy() diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 5456d9db5..5ec339bdf 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -868,6 +868,83 @@ func TestTaskGroup_Validate(t *testing.T) { err = tg.Validate(j) require.Contains(t, err.Error(), "Port label http already in use") require.Contains(t, err.Error(), "Port mapped to 80 already in use") + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Volume: &Volume{ + Type: "nothost", + }, + Config: map[string]interface{}{ + "sOuRcE": "foo", + }, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has unrecognised type nothost`) + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Volume: &Volume{ + Type: "host", + }, + Config: nil, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has an empty source`) + + tg = &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Volume: &Volume{ + Type: "host", + }, + Config: nil, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "", + }, + }, + }, + { + Name: "task-b", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "foob", + }, + }, + }, + }, + } + err = tg.Validate(&Job{}) + expected = `Task task-a has a volume mount (0) referencing an empty volume` + require.Contains(t, err.Error(), expected) + + expected = `Task task-b has a volume mount (0) referencing undefined volume foob` + require.Contains(t, err.Error(), expected) } func TestTask_Validate(t *testing.T) { diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go new file mode 100644 index 000000000..80ea52f96 --- /dev/null +++ b/nomad/structs/volumes.go @@ -0,0 +1,188 @@ +package structs + +import ( + "github.com/mitchellh/copystructure" + "github.com/mitchellh/mapstructure" +) + +const ( + VolumeTypeHost = "host" +) + +// ClientHostVolumeConfig is used to configure access to host paths on a Nomad Client +type ClientHostVolumeConfig struct { + Name string `hcl:",key"` + Source string `hcl:"source"` + ReadOnly bool `hcl:"read_only"` + Hidden bool `hcl:"hidden"` +} + +func (p *ClientHostVolumeConfig) Copy() *ClientHostVolumeConfig { + if p == nil { + return nil + } + + c := new(ClientHostVolumeConfig) + *c = *p + return c +} + +func CopyMapStringClientHostVolumeConfig(m map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig { + if m == nil { + return nil + } + + nm := make(map[string]*ClientHostVolumeConfig, len(m)) + for k, v := range m { + nm[k] = v.Copy() + } + + return nm +} + +func HostVolumeSliceMerge(a, b []*ClientHostVolumeConfig) []*ClientHostVolumeConfig { + n := make([]*ClientHostVolumeConfig, len(a)) + seenKeys := make(map[string]struct{}, len(a)) + + for k, v := range a { + if _, ok := seenKeys[v.Name]; ok { + continue + } + n[k] = v.Copy() + seenKeys[v.Name] = struct{}{} + } + for k, v := range b { + if _, ok := seenKeys[v.Name]; ok { + continue + } + n[k] = v.Copy() + seenKeys[v.Name] = struct{}{} + } + + return n +} + +// HostVolumeConfig is the struct that is expected inside the `config` section +// of a `host` type volume. +type HostVolumeConfig struct { + // Source is the name of the desired HostVolume. + Source string +} + +func (h *HostVolumeConfig) Copy() *HostVolumeConfig { + if h == nil { + return nil + } + nh := new(HostVolumeConfig) + *nh = *h + return nh +} + +// Volume is a representation of a storage volume that a TaskGroup wishes to use. +type Volume struct { + Name string + Type string + ReadOnly bool + Hidden bool + + Config map[string]interface{} +} + +func (v *Volume) Copy() *Volume { + if v == nil { + return nil + } + nv := new(Volume) + *nv = *v + + if i, err := copystructure.Copy(nv.Config); err != nil { + panic(err.Error()) + } else { + nv.Config = i.(map[string]interface{}) + } + + return nv +} + +func CopyMapVolumes(s map[string]*Volume) map[string]*Volume { + if s == nil { + return nil + } + + l := len(s) + c := make(map[string]*Volume, l) + for k, v := range s { + c[k] = v.Copy() + } + return c +} + +// VolumeMount is ... +type VolumeMount struct { + Volume string + Destination string + ReadOnly bool +} + +func (v *VolumeMount) Copy() *VolumeMount { + if v == nil { + return nil + } + + nv := new(VolumeMount) + *nv = *v + return nv +} + +func CopySliceVolumeMount(s []*VolumeMount) []*VolumeMount { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*VolumeMount, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + +type VolumeRequest struct { + Volume *Volume + Config map[string]interface{} +} + +func (h *VolumeRequest) Copy() *VolumeRequest { + if h == nil { + return nil + } + + c := new(VolumeRequest) + c.Volume = h.Volume.Copy() + if i, err := copystructure.Copy(h.Config); err != nil { + panic(err.Error()) + } else { + c.Config = i.(map[string]interface{}) + } + return c +} + +func CopyMapVolumeRequest(m map[string]*VolumeRequest) map[string]*VolumeRequest { + if m == nil { + return nil + } + + l := len(m) + c := make(map[string]*VolumeRequest, l) + for k, v := range m { + c[k] = v.Copy() + } + return c +} + +func ParseHostVolumeConfig(m map[string]interface{}) (*HostVolumeConfig, error) { + var c HostVolumeConfig + err := mapstructure.Decode(m, &c) + + return &c, err +}