This commit is contained in:
Alex Dadgar
2016-11-23 15:48:36 -08:00
parent 22d494f812
commit 35d274df1f
4 changed files with 185 additions and 13 deletions

View File

@@ -102,6 +102,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
delete(m, "update")
delete(m, "periodic")
delete(m, "vault")
delete(m, "dispatch")
// Set the ID and name to the object key
result.ID = obj.Keys[0].Token.Value().(string)
@@ -127,19 +128,20 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
// Check for invalid keys
valid := []string{
"id",
"name",
"region",
"all_at_once",
"type",
"priority",
"datacenters",
"constraint",
"update",
"periodic",
"meta",
"task",
"datacenters",
"dispatch",
"group",
"id",
"meta",
"name",
"periodic",
"priority",
"region",
"task",
"type",
"update",
"vault",
"vault_token",
}
@@ -168,6 +170,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
}
}
// If we have a dispatch definition, then parse that
if o := listVal.Filter("dispatch"); len(o.Items) > 0 {
if err := parseDispatch(&result.Dispatch, o); err != nil {
return multierror.Prefix(err, "dispatch ->")
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
@@ -552,6 +561,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"artifact",
"config",
"constraint",
"dispatch_input",
"driver",
"env",
"kill_timeout",
@@ -574,6 +584,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
delete(m, "artifact")
delete(m, "config")
delete(m, "constraint")
delete(m, "dispatch_input")
delete(m, "env")
delete(m, "logs")
delete(m, "meta")
@@ -733,6 +744,33 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
t.Vault = v
}
// If we have a dispatch_input block parse that
if o := listVal.Filter("dispatch_input"); len(o.Items) > 0 {
if len(o.Items) > 1 {
return fmt.Errorf("only one dispatch_input block is allowed in a task. Number of logs block found: %d", len(o.Items))
}
var m map[string]interface{}
dispatchBlock := o.Items[0]
// Check for invalid keys
valid := []string{
"stdin",
"file",
}
if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', dispatch_input ->", n))
}
if err := hcl.DecodeObject(&m, dispatchBlock.Val); err != nil {
return err
}
t.DispatchInput = &structs.DispatchInputConfig{}
if err := mapstructure.WeakDecode(m, t.DispatchInput); err != nil {
return err
}
}
*result = append(*result, &t)
}
@@ -1205,6 +1243,73 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
return nil
}
func parseDispatch(result **structs.DispatchConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'dispatch' block allowed per job")
}
// Get our resource object
o := list.Items[0]
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
delete(m, "meta")
// Check for invalid keys
valid := []string{
"input_data",
"meta_keys",
"paused",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
}
// Build the dispatch block
var d structs.DispatchConfig
if err := mapstructure.WeakDecode(m, &d); err != nil {
return err
}
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("dispatch block should be an object")
}
// Parse the meta block
if metaList := listVal.Filter("meta_keys"); len(metaList.Items) > 0 {
// Get our resource object
o := metaList.Items[0]
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
// Check for invalid keys
valid := []string{
"optional",
"required",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &d); err != nil {
return err
}
}
*result = &d
return nil
}
func checkHCLKeys(node ast.Node, valid []string) error {
var list *ast.ObjectList
switch n := node.(type) {

View File

@@ -537,6 +537,52 @@ func TestParse(t *testing.T) {
},
false,
},
{
"dispatch.hcl",
&structs.Job{
ID: "dispatch",
Name: "dispatch",
Type: "service",
Priority: 50,
Region: "global",
Dispatch: &structs.DispatchConfig{
Paused: true,
InputData: "required",
MetaRequired: []string{"foo", "bar"},
MetaOptional: []string{"baz", "bam"},
},
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "foo",
Count: 1,
EphemeralDisk: structs.DefaultEphemeralDisk(),
Tasks: []*structs.Task{
&structs.Task{
Name: "bar",
Driver: "docker",
Resources: &structs.Resources{
CPU: 100,
MemoryMB: 10,
IOPS: 0,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
DispatchInput: &structs.DispatchInputConfig{
Stdin: true,
File: "foo/bar",
},
},
},
},
},
},
false,
},
}
for _, tc := range cases {

View File

@@ -0,0 +1,21 @@
job "dispatch" {
dispatch {
paused = true
input_data = "required"
meta_keys {
required = ["foo", "bar"]
optional = ["baz", "bam"]
}
}
group "foo" {
task "bar" {
driver = "docker"
resources {}
dispatch_input {
stdin = true
file = "foo/bar"
}
}
}
}

View File

@@ -1557,13 +1557,13 @@ type DispatchConfig struct {
Paused bool
// InputData configure the input data requirements
InputData string
InputData string `mapstructure:"input_data"`
// MetaRequired is metadata keys that must be specified by the dispatcher
MetaRequired []string
MetaRequired []string `mapstructure:"required"`
// MetaOptional is metadata keys that may be specified by the dispatcher
MetaOptional []string
MetaOptional []string `mapstructure:"optional"`
}
func (d *DispatchConfig) Validate() error {