mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
Merge branch 'master' into docs-networking
This commit is contained in:
@@ -7,6 +7,7 @@ BACKWARDS INCOMPATIBILITIES:
|
||||
* core: Removed weight and hard/soft fields in constraints [GH-351]
|
||||
* drivers: Qemu and Java driver configurations have been updated to both use
|
||||
`artifact_source` as the source for external images/jars to be ran
|
||||
* New reserved and dynamic port specification [GH-415]
|
||||
|
||||
FEATURES:
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ type ServiceCheck struct {
|
||||
Name string
|
||||
Type string
|
||||
Script string
|
||||
Http string
|
||||
Path string
|
||||
Protocol string
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
|
||||
@@ -33,9 +33,10 @@ type AllocStateUpdater func(alloc *structs.Allocation) error
|
||||
|
||||
// AllocRunner is used to wrap an allocation and provide the execution context.
|
||||
type AllocRunner struct {
|
||||
config *config.Config
|
||||
updater AllocStateUpdater
|
||||
logger *log.Logger
|
||||
config *config.Config
|
||||
updater AllocStateUpdater
|
||||
logger *log.Logger
|
||||
consulClient *ConsulClient
|
||||
|
||||
alloc *structs.Allocation
|
||||
|
||||
@@ -66,18 +67,20 @@ type allocRunnerState struct {
|
||||
}
|
||||
|
||||
// NewAllocRunner is used to create a new allocation context
|
||||
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner {
|
||||
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
|
||||
alloc *structs.Allocation, consulClient *ConsulClient) *AllocRunner {
|
||||
ar := &AllocRunner{
|
||||
config: config,
|
||||
updater: updater,
|
||||
logger: logger,
|
||||
alloc: alloc,
|
||||
dirtyCh: make(chan struct{}, 1),
|
||||
tasks: make(map[string]*TaskRunner),
|
||||
restored: make(map[string]struct{}),
|
||||
updateCh: make(chan *structs.Allocation, 8),
|
||||
destroyCh: make(chan struct{}),
|
||||
waitCh: make(chan struct{}),
|
||||
config: config,
|
||||
updater: updater,
|
||||
logger: logger,
|
||||
alloc: alloc,
|
||||
consulClient: consulClient,
|
||||
dirtyCh: make(chan struct{}, 1),
|
||||
tasks: make(map[string]*TaskRunner),
|
||||
restored: make(map[string]struct{}),
|
||||
updateCh: make(chan *structs.Allocation, 8),
|
||||
destroyCh: make(chan struct{}),
|
||||
waitCh: make(chan struct{}),
|
||||
}
|
||||
return ar
|
||||
}
|
||||
@@ -109,7 +112,8 @@ func (r *AllocRunner) RestoreState() error {
|
||||
task := &structs.Task{Name: name}
|
||||
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
|
||||
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker)
|
||||
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
|
||||
r.consulClient)
|
||||
r.tasks[name] = tr
|
||||
|
||||
// Skip tasks in terminal states.
|
||||
@@ -320,7 +324,8 @@ func (r *AllocRunner) Run() {
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
|
||||
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker)
|
||||
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
|
||||
r.consulClient)
|
||||
r.tasks[task.Name] = tr
|
||||
go tr.Run()
|
||||
}
|
||||
|
||||
@@ -31,12 +31,13 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
|
||||
conf.AllocDir = os.TempDir()
|
||||
upd := &MockAllocStateUpdater{}
|
||||
alloc := mock.Alloc()
|
||||
consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500")
|
||||
if !restarts {
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
|
||||
}
|
||||
|
||||
ar := NewAllocRunner(logger, conf, upd.Update, alloc)
|
||||
ar := NewAllocRunner(logger, conf, upd.Update, alloc, consulClient)
|
||||
return upd, ar
|
||||
}
|
||||
|
||||
@@ -64,7 +65,7 @@ func TestAllocRunner_Destroy(t *testing.T) {
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = "10"
|
||||
task.Config["args"] = []string{"10"}
|
||||
go ar.Run()
|
||||
start := time.Now()
|
||||
|
||||
@@ -96,7 +97,7 @@ func TestAllocRunner_Update(t *testing.T) {
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = "10"
|
||||
task.Config["args"] = []string{"10"}
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
start := time.Now()
|
||||
@@ -129,7 +130,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = "10"
|
||||
task.Config["args"] = []string{"10"}
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
@@ -141,8 +142,9 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create a new alloc runner
|
||||
consulClient, err := NewConsulClient(ar.logger, "127.0.0.1:8500")
|
||||
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
|
||||
&structs.Allocation{ID: ar.alloc.ID})
|
||||
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
|
||||
err = ar2.RestoreState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
@@ -70,6 +70,8 @@ type Client struct {
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
consulClient *ConsulClient
|
||||
|
||||
lastServer net.Addr
|
||||
lastRPCTime time.Time
|
||||
lastServerLock sync.Mutex
|
||||
@@ -96,14 +98,22 @@ func NewClient(cfg *config.Config) (*Client, error) {
|
||||
// Create a logger
|
||||
logger := log.New(cfg.LogOutput, "", log.LstdFlags)
|
||||
|
||||
// Create the consul client
|
||||
consulAddr := cfg.ReadDefault("consul.address", "127.0.0.1:8500")
|
||||
consulClient, err := NewConsulClient(logger, consulAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create the consul client: %v", err)
|
||||
}
|
||||
|
||||
// Create the client
|
||||
c := &Client{
|
||||
config: cfg,
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
logger: logger,
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
shutdownCh: make(chan struct{}),
|
||||
config: cfg,
|
||||
start: time.Now(),
|
||||
consulClient: consulClient,
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
logger: logger,
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the client
|
||||
@@ -136,6 +146,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
|
||||
|
||||
// Start the client!
|
||||
go c.run()
|
||||
|
||||
// Start the consul client
|
||||
go c.consulClient.SyncWithConsul()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -200,6 +213,9 @@ func (c *Client) Shutdown() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the consul client
|
||||
c.consulClient.ShutDown()
|
||||
|
||||
c.shutdown = true
|
||||
close(c.shutdownCh)
|
||||
c.connPool.Shutdown()
|
||||
@@ -335,7 +351,7 @@ func (c *Client) restoreState() error {
|
||||
for _, entry := range list {
|
||||
id := entry.Name()
|
||||
alloc := &structs.Allocation{ID: id}
|
||||
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc)
|
||||
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient)
|
||||
c.allocs[id] = ar
|
||||
if err := ar.RestoreState(); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
|
||||
@@ -749,7 +765,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
|
||||
func (c *Client) addAlloc(alloc *structs.Allocation) error {
|
||||
c.allocLock.Lock()
|
||||
defer c.allocLock.Unlock()
|
||||
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc)
|
||||
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient)
|
||||
c.allocs[alloc.ID] = ar
|
||||
go ar.Run()
|
||||
return nil
|
||||
|
||||
@@ -336,7 +336,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
|
||||
alloc1.NodeID = c1.Node().ID
|
||||
task := alloc1.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = "10"
|
||||
task.Config["args"] = []string{"10"}
|
||||
|
||||
state := s1.State()
|
||||
err := state.UpsertAllocs(100,
|
||||
|
||||
201
client/consul.go
Normal file
201
client/consul.go
Normal file
@@ -0,0 +1,201 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"log"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
syncInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
type trackedService struct {
|
||||
allocId string
|
||||
task *structs.Task
|
||||
service *structs.Service
|
||||
}
|
||||
|
||||
type ConsulClient struct {
|
||||
client *consul.Client
|
||||
logger *log.Logger
|
||||
shutdownCh chan struct{}
|
||||
|
||||
trackedServices map[string]*trackedService
|
||||
trackedSrvLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) {
|
||||
var err error
|
||||
var c *consul.Client
|
||||
cfg := consul.DefaultConfig()
|
||||
cfg.Address = consulAddr
|
||||
if c, err = consul.NewClient(cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
consulClient := ConsulClient{
|
||||
client: c,
|
||||
logger: logger,
|
||||
trackedServices: make(map[string]*trackedService),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
return &consulClient, nil
|
||||
}
|
||||
|
||||
func (c *ConsulClient) Register(task *structs.Task, allocID string) error {
|
||||
var mErr multierror.Error
|
||||
for _, service := range task.Services {
|
||||
c.logger.Printf("[INFO] Registering service %s with Consul.", service.Name)
|
||||
if err := c.registerService(service, task, allocID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
ts := &trackedService{
|
||||
allocId: allocID,
|
||||
task: task,
|
||||
service: service,
|
||||
}
|
||||
c.trackedSrvLock.Lock()
|
||||
c.trackedServices[service.Id] = ts
|
||||
c.trackedSrvLock.Unlock()
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (c *ConsulClient) Deregister(task *structs.Task) error {
|
||||
var mErr multierror.Error
|
||||
for _, service := range task.Services {
|
||||
c.logger.Printf("[INFO] De-Registering service %v with Consul", service.Name)
|
||||
if err := c.deregisterService(service.Id); err != nil {
|
||||
c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
c.trackedSrvLock.Lock()
|
||||
delete(c.trackedServices, service.Id)
|
||||
c.trackedSrvLock.Unlock()
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (c *ConsulClient) ShutDown() {
|
||||
close(c.shutdownCh)
|
||||
}
|
||||
|
||||
func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.Task) (string, int) {
|
||||
for _, network := range task.Resources.Networks {
|
||||
if p, ok := network.MapLabelToValues()[portLabel]; ok {
|
||||
return network.IP, p
|
||||
}
|
||||
}
|
||||
return "", 0
|
||||
}
|
||||
|
||||
func (c *ConsulClient) SyncWithConsul() {
|
||||
sync := time.After(syncInterval)
|
||||
agent := c.client.Agent()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sync:
|
||||
sync = time.After(syncInterval)
|
||||
var consulServices map[string]*consul.AgentService
|
||||
var err error
|
||||
|
||||
// Get the list of the services that Consul knows about
|
||||
if consulServices, err = agent.Services(); err != nil {
|
||||
c.logger.Printf("[DEBUG] Error while syncing services with Consul: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// See if we have services that Consul doesn't know about yet.
|
||||
// Register with Consul the services which are not registered
|
||||
for serviceId := range c.trackedServices {
|
||||
if _, ok := consulServices[serviceId]; !ok {
|
||||
ts := c.trackedServices[serviceId]
|
||||
c.registerService(ts.service, ts.task, ts.allocId)
|
||||
}
|
||||
}
|
||||
|
||||
// See if consul thinks we have some services which are not running
|
||||
// anymore on the node. We de-register those services
|
||||
for serviceId := range consulServices {
|
||||
if serviceId == "consul" {
|
||||
continue
|
||||
}
|
||||
if _, ok := c.trackedServices[serviceId]; !ok {
|
||||
if err := c.deregisterService(serviceId); err != nil {
|
||||
c.logger.Printf("[DEBUG] Error while de-registering service with ID: %s", serviceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
case <-c.shutdownCh:
|
||||
c.logger.Printf("[INFO] Shutting down Consul Client")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error {
|
||||
var mErr multierror.Error
|
||||
service.Id = fmt.Sprintf("%s-%s", allocID, task.Name)
|
||||
host, port := c.findPortAndHostForLabel(service.PortLabel, task)
|
||||
if host == "" || port == 0 {
|
||||
return fmt.Errorf("The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name)
|
||||
}
|
||||
checks := c.makeChecks(service, host, port)
|
||||
asr := &consul.AgentServiceRegistration{
|
||||
ID: service.Id,
|
||||
Name: service.Name,
|
||||
Tags: service.Tags,
|
||||
Port: port,
|
||||
Address: host,
|
||||
Checks: checks,
|
||||
}
|
||||
if err := c.client.Agent().ServiceRegister(asr); err != nil {
|
||||
c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", service.Name, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (c *ConsulClient) deregisterService(serviceId string) error {
|
||||
if err := c.client.Agent().ServiceDeregister(serviceId); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck {
|
||||
var checks []*consul.AgentServiceCheck
|
||||
for _, check := range service.Checks {
|
||||
c := &consul.AgentServiceCheck{
|
||||
Interval: check.Interval.String(),
|
||||
Timeout: check.Timeout.String(),
|
||||
}
|
||||
switch check.Type {
|
||||
case structs.ServiceCheckHTTP:
|
||||
if check.Protocol == "" {
|
||||
check.Protocol = "http"
|
||||
}
|
||||
url := url.URL{
|
||||
Scheme: check.Protocol,
|
||||
Host: fmt.Sprintf("%s:%d", ip, port),
|
||||
Path: check.Path,
|
||||
}
|
||||
c.HTTP = url.String()
|
||||
case structs.ServiceCheckTCP:
|
||||
c.TCP = fmt.Sprintf("%s:%d", ip, port)
|
||||
case structs.ServiceCheckScript:
|
||||
c.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types
|
||||
}
|
||||
checks = append(checks, c)
|
||||
}
|
||||
return checks
|
||||
}
|
||||
53
client/consul_test.go
Normal file
53
client/consul_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMakeChecks(t *testing.T) {
|
||||
service := &structs.Service{
|
||||
Id: "Foo",
|
||||
Name: "Bar",
|
||||
Checks: []structs.ServiceCheck{
|
||||
{
|
||||
Type: "http",
|
||||
Path: "/foo/bar",
|
||||
Interval: 10 * time.Second,
|
||||
Timeout: 2 * time.Second,
|
||||
},
|
||||
{
|
||||
Type: "http",
|
||||
Protocol: "https",
|
||||
Path: "/foo/bar",
|
||||
Interval: 10 * time.Second,
|
||||
Timeout: 2 * time.Second,
|
||||
},
|
||||
{
|
||||
Type: "tcp",
|
||||
Interval: 10 * time.Second,
|
||||
Timeout: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
logger := log.New(os.Stdout, "logger: ", log.Lshortfile)
|
||||
|
||||
c, _ := NewConsulClient(logger, "")
|
||||
checks := c.makeChecks(service, "10.10.0.1", 8090)
|
||||
|
||||
if checks[0].HTTP != "http://10.10.0.1:8090/foo/bar" {
|
||||
t.Fatalf("Invalid http url for check: %v", checks[0].HTTP)
|
||||
}
|
||||
|
||||
if checks[1].HTTP != "https://10.10.0.1:8090/foo/bar" {
|
||||
t.Fatalf("Invalid http url for check: %v", checks[0].HTTP)
|
||||
}
|
||||
|
||||
if checks[2].TCP != "10.10.0.1:8090" {
|
||||
t.Fatalf("Invalid tcp check: %v", checks[0].TCP)
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,6 @@
|
||||
package args
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/mattn/go-shellwords"
|
||||
)
|
||||
import "regexp"
|
||||
|
||||
var (
|
||||
envRe = regexp.MustCompile(`\$({[a-zA-Z0-9_]+}|[a-zA-Z0-9_]+)`)
|
||||
@@ -13,27 +8,17 @@ var (
|
||||
|
||||
// ParseAndReplace takes the user supplied args and a map of environment
|
||||
// variables. It replaces any instance of an environment variable in the args
|
||||
// with the actual value and does correct splitting of the arg list.
|
||||
func ParseAndReplace(args string, env map[string]string) ([]string, error) {
|
||||
// Set up parser.
|
||||
p := shellwords.NewParser()
|
||||
p.ParseEnv = false
|
||||
p.ParseBacktick = false
|
||||
|
||||
parsed, err := p.Parse(args)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Couldn't parse args %v: %v", args, err)
|
||||
}
|
||||
|
||||
replaced := make([]string, len(parsed))
|
||||
for i, arg := range parsed {
|
||||
// with the actual value.
|
||||
func ParseAndReplace(args []string, env map[string]string) []string {
|
||||
replaced := make([]string, len(args))
|
||||
for i, arg := range args {
|
||||
replaced[i] = ReplaceEnv(arg, env)
|
||||
}
|
||||
|
||||
return replaced, nil
|
||||
return replaced
|
||||
}
|
||||
|
||||
// replaceEnv takes an arg and replaces all occurences of environment variables.
|
||||
// ReplaceEnv takes an arg and replaces all occurences of environment variables.
|
||||
// If the variable is found in the passed map it is replaced, otherwise the
|
||||
// original string is returned.
|
||||
func ReplaceEnv(arg string, env map[string]string) string {
|
||||
|
||||
@@ -21,12 +21,9 @@ var (
|
||||
)
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceInvalidEnv(t *testing.T) {
|
||||
input := "invalid $FOO"
|
||||
input := []string{"invalid", "$FOO"}
|
||||
exp := []string{"invalid", "$FOO"}
|
||||
act, err := ParseAndReplace(input, envVars)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse valid input args %v: %v", input, err)
|
||||
}
|
||||
act := ParseAndReplace(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
@@ -34,12 +31,9 @@ func TestDriverArgs_ParseAndReplaceInvalidEnv(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceValidEnv(t *testing.T) {
|
||||
input := fmt.Sprintf("nomad_ip \\\"$%v\\\"!", ipKey)
|
||||
input := []string{"nomad_ip", fmt.Sprintf(`"$%v"!`, ipKey)}
|
||||
exp := []string{"nomad_ip", fmt.Sprintf("\"%s\"!", ipVal)}
|
||||
act, err := ParseAndReplace(input, envVars)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse valid input args %v: %v", input, err)
|
||||
}
|
||||
act := ParseAndReplace(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
@@ -47,32 +41,9 @@ func TestDriverArgs_ParseAndReplaceValidEnv(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceChainedEnv(t *testing.T) {
|
||||
input := fmt.Sprintf("-foo $%s$%s", ipKey, portKey)
|
||||
input := []string{"-foo", fmt.Sprintf("$%s$%s", ipKey, portKey)}
|
||||
exp := []string{"-foo", fmt.Sprintf("%s%s", ipVal, portVal)}
|
||||
act, err := ParseAndReplace(input, envVars)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse valid input args %v: %v", input, err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceInvalidArgEscape(t *testing.T) {
|
||||
input := "-c \"echo \"foo\\\" > bar.txt\""
|
||||
if _, err := ParseAndReplace(input, envVars); err == nil {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) should have failed", input, envVars)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceValidArgEscape(t *testing.T) {
|
||||
input := "-c \"echo \\\"foo\\\" > bar.txt\""
|
||||
exp := []string{"-c", "echo \"foo\" > bar.txt"}
|
||||
act, err := ParseAndReplace(input, envVars)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse valid input args %v: %v", input, err)
|
||||
}
|
||||
act := ParseAndReplace(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
|
||||
@@ -35,7 +35,7 @@ type DockerDriverAuth struct {
|
||||
type DockerDriverConfig struct {
|
||||
ImageName string `mapstructure:"image"` // Container's Image Name
|
||||
Command string `mapstructure:"command"` // The Command/Entrypoint to run when the container starts up
|
||||
Args string `mapstructure:"args"` // The arguments to the Command/Entrypoint
|
||||
Args []string `mapstructure:"args"` // The arguments to the Command/Entrypoint
|
||||
NetworkMode string `mapstructure:"network_mode"` // The network mode of the container - host, net and none
|
||||
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and the ports exposed on the container
|
||||
Privileged bool `mapstructure:"privileged"` // Flag to run the container in priviledged mode
|
||||
@@ -293,21 +293,18 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
|
||||
config.ExposedPorts = exposedPorts
|
||||
}
|
||||
|
||||
parsedArgs, err := args.ParseAndReplace(driverConfig.Args, env.Map())
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
parsedArgs := args.ParseAndReplace(driverConfig.Args, env.Map())
|
||||
|
||||
// If the user specified a custom command to run as their entrypoint, we'll
|
||||
// inject it here.
|
||||
if driverConfig.Command != "" {
|
||||
cmd := []string{driverConfig.Command}
|
||||
if driverConfig.Args != "" {
|
||||
if len(driverConfig.Args) != 0 {
|
||||
cmd = append(cmd, parsedArgs...)
|
||||
}
|
||||
d.logger.Printf("[DEBUG] driver.docker: setting container startup command to: %s", strings.Join(cmd, " "))
|
||||
config.Cmd = cmd
|
||||
} else if driverConfig.Args != "" {
|
||||
} else if len(driverConfig.Args) != 0 {
|
||||
d.logger.Println("[DEBUG] driver.docker: ignoring command arguments because command is not specified")
|
||||
}
|
||||
|
||||
@@ -431,7 +428,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
|
||||
if len(containers) != 1 {
|
||||
log.Printf("[ERR] driver.docker: failed to get id for container %s", config.Name)
|
||||
return nil, fmt.Errorf("Failed to get id for container %s", config.Name, err)
|
||||
return nil, fmt.Errorf("Failed to get id for container %s", config.Name)
|
||||
}
|
||||
|
||||
log.Printf("[INFO] driver.docker: a container with the name %s already exists; will attempt to purge and re-create", config.Name)
|
||||
|
||||
@@ -137,7 +137,7 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"image": "redis",
|
||||
"command": "redis-server",
|
||||
"args": "-v",
|
||||
"args": []string{"-v"},
|
||||
},
|
||||
Resources: &structs.Resources{
|
||||
MemoryMB: 256,
|
||||
@@ -190,7 +190,11 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"image": "redis",
|
||||
"command": "/bin/bash",
|
||||
"args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file),
|
||||
"args": []string{
|
||||
"-c",
|
||||
fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`,
|
||||
string(exp), environment.AllocDir, file),
|
||||
},
|
||||
},
|
||||
Resources: &structs.Resources{
|
||||
MemoryMB: 256,
|
||||
@@ -243,7 +247,7 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"image": "redis",
|
||||
"command": "/bin/sleep",
|
||||
"args": "10",
|
||||
"args": []string{"10"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
|
||||
@@ -24,10 +24,10 @@ type ExecDriver struct {
|
||||
fingerprint.StaticFingerprinter
|
||||
}
|
||||
type ExecDriverConfig struct {
|
||||
ArtifactSource string `mapstructure:"artifact_source"`
|
||||
Checksum string `mapstructure:"checksum"`
|
||||
Command string `mapstructure:"command"`
|
||||
Args string `mapstructure:"args"`
|
||||
ArtifactSource string `mapstructure:"artifact_source"`
|
||||
Checksum string `mapstructure:"checksum"`
|
||||
Command string `mapstructure:"command"`
|
||||
Args []string `mapstructure:"args"`
|
||||
}
|
||||
|
||||
// execHandle is returned from Start/Open as a handle to the PID
|
||||
@@ -91,14 +91,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
// Get the environment variables.
|
||||
envVars := TaskEnvironmentVariables(ctx, task)
|
||||
|
||||
// Look for arguments
|
||||
var args []string
|
||||
if driverConfig.Args != "" {
|
||||
args = append(args, driverConfig.Args)
|
||||
}
|
||||
|
||||
// Setup the command
|
||||
cmd := executor.Command(command, args...)
|
||||
cmd := executor.Command(command, driverConfig.Args...)
|
||||
if err := cmd.Limit(task.Resources); err != nil {
|
||||
return nil, fmt.Errorf("failed to constrain resources: %s", err)
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": "5",
|
||||
"args": []string{"5"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -73,7 +73,7 @@ func TestExecDriver_Start_Wait(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": "2",
|
||||
"args": []string{"2"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -161,7 +161,10 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
|
||||
"command": "/bin/bash",
|
||||
"args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)),
|
||||
"args": []string{
|
||||
"-c",
|
||||
fmt.Sprintf(`/bin/sleep 1 && %s`, filepath.Join("$NOMAD_TASK_DIR", file)),
|
||||
},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -204,7 +207,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/bash",
|
||||
"args": fmt.Sprintf("-c \"sleep 1; echo -n %s > $%s/%s\"", string(exp), environment.AllocDir, file),
|
||||
"args": []string{
|
||||
"-c",
|
||||
fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`, string(exp), environment.AllocDir, file),
|
||||
},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -250,7 +256,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": "1",
|
||||
"args": []string{"1"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ type BasicExecutor struct {
|
||||
allocDir string
|
||||
}
|
||||
|
||||
// TODO: Have raw_exec use this as well.
|
||||
func NewBasicExecutor() Executor {
|
||||
return &BasicExecutor{}
|
||||
}
|
||||
@@ -63,12 +62,7 @@ func (e *BasicExecutor) Start() error {
|
||||
}
|
||||
|
||||
e.cmd.Path = args.ReplaceEnv(e.cmd.Path, envVars.Map())
|
||||
combined := strings.Join(e.cmd.Args, " ")
|
||||
parsed, err := args.ParseAndReplace(combined, envVars.Map())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.cmd.Args = parsed
|
||||
e.cmd.Args = args.ParseAndReplace(e.cmd.Args, envVars.Map())
|
||||
|
||||
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
|
||||
e.spawn = spawn.NewSpawner(spawnState)
|
||||
|
||||
@@ -167,12 +167,7 @@ func (e *LinuxExecutor) Start() error {
|
||||
}
|
||||
|
||||
e.cmd.Path = args.ReplaceEnv(e.cmd.Path, envVars.Map())
|
||||
combined := strings.Join(e.cmd.Args, " ")
|
||||
parsed, err := args.ParseAndReplace(combined, envVars.Map())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.cmd.Args = parsed
|
||||
e.cmd.Args = args.ParseAndReplace(e.cmd.Args, envVars.Map())
|
||||
|
||||
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
|
||||
e.spawn = spawn.NewSpawner(spawnState)
|
||||
|
||||
@@ -112,7 +112,7 @@ func Executor_Start_Wait(t *testing.T, command buildExecCommand) {
|
||||
expected := "hello world"
|
||||
file := filepath.Join(allocdir.TaskLocal, "output.txt")
|
||||
absFilePath := filepath.Join(taskDir, file)
|
||||
cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file)
|
||||
cmd := fmt.Sprintf(`/bin/sleep 1 ; echo -n %v > %v`, expected, file)
|
||||
e := command("/bin/bash", "-c", cmd)
|
||||
|
||||
if err := e.Limit(constraint); err != nil {
|
||||
@@ -190,7 +190,7 @@ func Executor_Open(t *testing.T, command buildExecCommand, newExecutor func() Ex
|
||||
expected := "hello world"
|
||||
file := filepath.Join(allocdir.TaskLocal, "output.txt")
|
||||
absFilePath := filepath.Join(taskDir, file)
|
||||
cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file)
|
||||
cmd := fmt.Sprintf(`/bin/sleep 1 ; echo -n %v > %v`, expected, file)
|
||||
e := command("/bin/bash", "-c", cmd)
|
||||
|
||||
if err := e.Limit(constraint); err != nil {
|
||||
@@ -28,10 +28,10 @@ type JavaDriver struct {
|
||||
}
|
||||
|
||||
type JavaDriverConfig struct {
|
||||
JvmOpts string `mapstructure:"jvm_options"`
|
||||
ArtifactSource string `mapstructure:"artifact_source"`
|
||||
Checksum string `mapstructure:"checksum"`
|
||||
Args string `mapstructure:"args"`
|
||||
JvmOpts []string `mapstructure:"jvm_options"`
|
||||
ArtifactSource string `mapstructure:"artifact_source"`
|
||||
Checksum string `mapstructure:"checksum"`
|
||||
Args []string `mapstructure:"args"`
|
||||
}
|
||||
|
||||
// javaHandle is returned from Start/Open as a handle to the PID
|
||||
@@ -126,15 +126,15 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
|
||||
args := []string{}
|
||||
// Look for jvm options
|
||||
if driverConfig.JvmOpts != "" {
|
||||
if len(driverConfig.JvmOpts) != 0 {
|
||||
d.logger.Printf("[DEBUG] driver.java: found JVM options: %s", driverConfig.JvmOpts)
|
||||
args = append(args, driverConfig.JvmOpts)
|
||||
args = append(args, driverConfig.JvmOpts...)
|
||||
}
|
||||
|
||||
// Build the argument list.
|
||||
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName))
|
||||
if driverConfig.Args != "" {
|
||||
args = append(args, driverConfig.Args)
|
||||
if len(driverConfig.Args) != 0 {
|
||||
args = append(args, driverConfig.Args...)
|
||||
}
|
||||
|
||||
// Setup the command
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
|
||||
Name: "demo-app",
|
||||
Config: map[string]interface{}{
|
||||
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
|
||||
"jvm_options": "-Xmx2048m -Xms256m",
|
||||
"jvm_options": []string{"-Xmx64m", "-Xms32m"},
|
||||
"checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8",
|
||||
},
|
||||
Resources: basicResources,
|
||||
@@ -97,7 +97,6 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
|
||||
Name: "demo-app",
|
||||
Config: map[string]interface{}{
|
||||
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
|
||||
"jvm_options": "-Xmx2048m -Xms256m",
|
||||
"checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8",
|
||||
},
|
||||
Resources: basicResources,
|
||||
|
||||
@@ -93,15 +93,9 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
||||
// Get the environment variables.
|
||||
envVars := TaskEnvironmentVariables(ctx, task)
|
||||
|
||||
// Look for arguments
|
||||
var args []string
|
||||
if driverConfig.Args != "" {
|
||||
args = append(args, driverConfig.Args)
|
||||
}
|
||||
|
||||
// Setup the command
|
||||
cmd := executor.NewBasicExecutor()
|
||||
executor.SetCommand(cmd, command, args)
|
||||
executor.SetCommand(cmd, command, driverConfig.Args)
|
||||
if err := cmd.Limit(task.Resources); err != nil {
|
||||
return nil, fmt.Errorf("failed to constrain resources: %s", err)
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": "1",
|
||||
"args": []string{"1"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -151,7 +151,10 @@ func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
|
||||
"command": "/bin/bash",
|
||||
"args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)),
|
||||
"args": []string{
|
||||
"-c",
|
||||
fmt.Sprintf(`'/bin/sleep 1 && %s'`, filepath.Join("$NOMAD_TASK_DIR", file)),
|
||||
},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -190,7 +193,7 @@ func TestRawExecDriver_Start_Wait(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": "1",
|
||||
"args": []string{"1"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -232,7 +235,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/bash",
|
||||
"args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file),
|
||||
"args": []string{
|
||||
"-c",
|
||||
fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`, string(exp), environment.AllocDir, file),
|
||||
},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
@@ -277,7 +283,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
|
||||
Name: "sleep",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": "1",
|
||||
"args": []string{"1"},
|
||||
},
|
||||
Resources: basicResources,
|
||||
}
|
||||
|
||||
@@ -37,8 +37,8 @@ type RktDriver struct {
|
||||
}
|
||||
|
||||
type RktDriverConfig struct {
|
||||
ImageName string `mapstructure:"image"`
|
||||
Args string `mapstructure:"args"`
|
||||
ImageName string `mapstructure:"image"`
|
||||
Args []string `mapstructure:"args"`
|
||||
}
|
||||
|
||||
// rktHandle is returned from Start/Open as a handle to the PID
|
||||
@@ -150,11 +150,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
}
|
||||
|
||||
// Add user passed arguments.
|
||||
if driverConfig.Args != "" {
|
||||
parsed, err := args.ParseAndReplace(driverConfig.Args, envVars.Map())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(driverConfig.Args) != 0 {
|
||||
parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map())
|
||||
|
||||
// Need to start arguments with "--"
|
||||
if len(parsed) > 0 {
|
||||
|
||||
@@ -119,7 +119,7 @@ func TestRktDriver_Start_Wait(t *testing.T) {
|
||||
"trust_prefix": "coreos.com/etcd",
|
||||
"image": "coreos.com/etcd:v2.0.4",
|
||||
"command": "/etcd",
|
||||
"args": "--version",
|
||||
"args": []string{"--version"},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"image": "coreos.com/etcd:v2.0.4",
|
||||
"command": "/etcd",
|
||||
"args": "--version",
|
||||
"args": []string{"--version"},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -202,7 +202,7 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) {
|
||||
"trust_prefix": "coreos.com/etcd",
|
||||
"image": "coreos.com/etcd:v2.0.4",
|
||||
"command": "/etcd",
|
||||
"args": "--version",
|
||||
"args": []string{"--version"},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ type TaskRunner struct {
|
||||
ctx *driver.ExecContext
|
||||
allocID string
|
||||
restartTracker restartTracker
|
||||
consulClient *ConsulClient
|
||||
|
||||
task *structs.Task
|
||||
state *structs.TaskState
|
||||
@@ -52,13 +53,14 @@ type TaskStateUpdater func(taskName string)
|
||||
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
updater TaskStateUpdater, ctx *driver.ExecContext,
|
||||
allocID string, task *structs.Task, state *structs.TaskState,
|
||||
restartTracker restartTracker) *TaskRunner {
|
||||
restartTracker restartTracker, consulClient *ConsulClient) *TaskRunner {
|
||||
|
||||
tc := &TaskRunner{
|
||||
config: config,
|
||||
updater: updater,
|
||||
logger: logger,
|
||||
restartTracker: restartTracker,
|
||||
consulClient: consulClient,
|
||||
ctx: ctx,
|
||||
allocID: allocID,
|
||||
task: task,
|
||||
@@ -231,6 +233,12 @@ func (r *TaskRunner) run() {
|
||||
var destroyErr error
|
||||
destroyed := false
|
||||
|
||||
// Register the services defined by the task with Consil
|
||||
r.consulClient.Register(r.task, r.allocID)
|
||||
|
||||
// De-Register the services belonging to the task from consul
|
||||
defer r.consulClient.Deregister(r.task)
|
||||
|
||||
OUTER:
|
||||
// Wait for updates
|
||||
for {
|
||||
@@ -303,6 +311,7 @@ func (r *TaskRunner) run() {
|
||||
// Set force start because we are restarting the task.
|
||||
forceStart = true
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
|
||||
upd := &MockTaskStateUpdater{}
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
|
||||
consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500")
|
||||
// Initialize the port listing. This should be done by the offer process but
|
||||
// we have a mock so that doesn't happen.
|
||||
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
|
||||
@@ -48,7 +48,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
|
||||
}
|
||||
|
||||
state := alloc.TaskStates[task.Name]
|
||||
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker)
|
||||
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker, consulClient)
|
||||
return upd, tr
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ func TestTaskRunner_Destroy(t *testing.T) {
|
||||
|
||||
// Change command to ensure we run for a bit
|
||||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = "10"
|
||||
tr.task.Config["args"] = []string{"10"}
|
||||
go tr.Run()
|
||||
|
||||
// Begin the tear down
|
||||
@@ -128,7 +128,7 @@ func TestTaskRunner_Update(t *testing.T) {
|
||||
|
||||
// Change command to ensure we run for a bit
|
||||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = "10"
|
||||
tr.task.Config["args"] = []string{"10"}
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
@@ -153,7 +153,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
||||
|
||||
// Change command to ensure we run for a bit
|
||||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = "10"
|
||||
tr.task.Config["args"] = []string{"10"}
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
|
||||
@@ -164,8 +164,10 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create a new task runner
|
||||
consulClient, _ := NewConsulClient(tr.logger, "127.0.0.1:8500")
|
||||
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
|
||||
tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker)
|
||||
tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker,
|
||||
consulClient)
|
||||
if err := tr2.RestoreState(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
@@ -124,6 +124,7 @@ func (c *RunCommand) Run(args []string) int {
|
||||
// This function is just a hammer and probably needs to be revisited.
|
||||
func convertJob(in *structs.Job) (*api.Job, error) {
|
||||
gob.Register([]map[string]interface{}{})
|
||||
gob.Register([]interface{}{})
|
||||
var apiJob *api.Job
|
||||
buf := new(bytes.Buffer)
|
||||
if err := gob.NewEncoder(buf).Encode(in); err != nil {
|
||||
|
||||
@@ -463,7 +463,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
|
||||
}
|
||||
|
||||
func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error {
|
||||
task.Services = make([]structs.Service, len(serviceObjs.Items))
|
||||
task.Services = make([]*structs.Service, len(serviceObjs.Items))
|
||||
var defaultServiceName bool
|
||||
for idx, o := range serviceObjs.Items {
|
||||
var service structs.Service
|
||||
@@ -503,7 +503,7 @@ func parseServices(jobName string, taskGroupName string, task *structs.Task, ser
|
||||
}
|
||||
}
|
||||
|
||||
task.Services[idx] = service
|
||||
task.Services[idx] = &service
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -94,7 +94,7 @@ func TestParse(t *testing.T) {
|
||||
Config: map[string]interface{}{
|
||||
"image": "hashicorp/binstore",
|
||||
},
|
||||
Services: []structs.Service{
|
||||
Services: []*structs.Service{
|
||||
{
|
||||
Id: "",
|
||||
Name: "binstore-storagelocker-binsl-binstore",
|
||||
|
||||
11
nomad/fsm.go
11
nomad/fsm.go
@@ -13,13 +13,6 @@ import (
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
var (
|
||||
msgpackHandle = &codec.MsgpackHandle{
|
||||
RawToString: true,
|
||||
WriteExt: true,
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// timeTableGranularity is the granularity of index to time tracking
|
||||
timeTableGranularity = 5 * time.Minute
|
||||
@@ -328,7 +321,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||
defer restore.Abort()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(old, msgpackHandle)
|
||||
dec := codec.NewDecoder(old, structs.MsgpackHandle)
|
||||
|
||||
// Read in the header
|
||||
var header snapshotHeader
|
||||
@@ -412,7 +405,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
|
||||
// Register the nodes
|
||||
encoder := codec.NewEncoder(sink, msgpackHandle)
|
||||
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
|
||||
|
||||
// Write the header
|
||||
header := snapshotHeader{}
|
||||
|
||||
@@ -86,7 +86,6 @@ func Job() *structs.Job {
|
||||
Driver: "exec",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/date",
|
||||
"args": "+%s",
|
||||
},
|
||||
Env: map[string]string{
|
||||
"FOO": "bar",
|
||||
@@ -151,7 +150,6 @@ func SystemJob() *structs.Job {
|
||||
Driver: "exec",
|
||||
Config: map[string]interface{}{
|
||||
"command": "/bin/date",
|
||||
"args": "+%s",
|
||||
},
|
||||
Resources: &structs.Resources{
|
||||
CPU: 500,
|
||||
|
||||
13
nomad/rpc.go
13
nomad/rpc.go
@@ -11,7 +11,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -53,24 +52,16 @@ const (
|
||||
enqueueLimit = 30 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
// rpcHandle is the MsgpackHandle to be used by both Client and Server codecs.
|
||||
rpcHandle = &codec.MsgpackHandle{
|
||||
// Enables proper encoding of strings within nil interfaces.
|
||||
RawToString: true,
|
||||
}
|
||||
)
|
||||
|
||||
// NewClientCodec returns a new rpc.ClientCodec to be used to make RPC calls to
|
||||
// the Nomad Server.
|
||||
func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
|
||||
return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle)
|
||||
return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
|
||||
}
|
||||
|
||||
// NewServerCodec returns a new rpc.ServerCodec to be used by the Nomad Server
|
||||
// to handle rpcs.
|
||||
func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
|
||||
return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle)
|
||||
return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
|
||||
}
|
||||
|
||||
// listen is used to listen for incoming RPC connections
|
||||
|
||||
@@ -1009,7 +1009,7 @@ type ServiceCheck struct {
|
||||
Name string // Name of the check, defaults to id
|
||||
Type string // Type of the check - tcp, http, docker and script
|
||||
Script string // Script to invoke for script check
|
||||
Http string // path of the health check url for http type check
|
||||
Path string // path of the health check url for http type check
|
||||
Protocol string // Protocol to use if check is http, defaults to http
|
||||
Interval time.Duration // Interval of the check
|
||||
Timeout time.Duration // Timeout of the response from the check before consul fails the check
|
||||
@@ -1017,16 +1017,16 @@ type ServiceCheck struct {
|
||||
|
||||
func (sc *ServiceCheck) Validate() error {
|
||||
t := strings.ToLower(sc.Type)
|
||||
if sc.Type == ServiceCheckHTTP && sc.Http == "" {
|
||||
if t != ServiceCheckTCP && t != ServiceCheckHTTP {
|
||||
return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type)
|
||||
}
|
||||
if sc.Type == ServiceCheckHTTP && sc.Path == "" {
|
||||
return fmt.Errorf("http checks needs the Http path information.")
|
||||
}
|
||||
|
||||
if sc.Type == ServiceCheckScript && sc.Script == "" {
|
||||
return fmt.Errorf("Script checks need the script to invoke")
|
||||
}
|
||||
if t != ServiceCheckTCP && t != ServiceCheckHTTP && t != ServiceCheckDocker && t != ServiceCheckScript {
|
||||
return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1064,7 +1064,7 @@ type Task struct {
|
||||
Env map[string]string
|
||||
|
||||
// List of service definitions exposed by the Task
|
||||
Services []Service
|
||||
Services []*Service
|
||||
|
||||
// Constraints can be specified at a task level and apply only to
|
||||
// the particular task.
|
||||
@@ -1704,7 +1704,7 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) {
|
||||
}
|
||||
|
||||
// msgpackHandle is a shared handle for encoding/decoding of structs
|
||||
var msgpackHandle = func() *codec.MsgpackHandle {
|
||||
var MsgpackHandle = func() *codec.MsgpackHandle {
|
||||
h := &codec.MsgpackHandle{RawToString: true}
|
||||
|
||||
// Sets the default type for decoding a map into a nil interface{}.
|
||||
@@ -1716,13 +1716,13 @@ var msgpackHandle = func() *codec.MsgpackHandle {
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
func Decode(buf []byte, out interface{}) error {
|
||||
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
||||
return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out)
|
||||
}
|
||||
|
||||
// Encode is used to encode a MsgPack object with type prefix
|
||||
func Encode(t MessageType, msg interface{}) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
buf.WriteByte(uint8(t))
|
||||
err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg)
|
||||
err := codec.NewEncoder(&buf, MsgpackHandle).Encode(msg)
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
func TestTimeTable(t *testing.T) {
|
||||
@@ -104,14 +105,14 @@ func TestTimeTable_SerializeDeserialize(t *testing.T) {
|
||||
tt.Witness(50, plusHour)
|
||||
|
||||
var buf bytes.Buffer
|
||||
enc := codec.NewEncoder(&buf, msgpackHandle)
|
||||
enc := codec.NewEncoder(&buf, structs.MsgpackHandle)
|
||||
|
||||
err := tt.Serialize(enc)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
dec := codec.NewDecoder(&buf, msgpackHandle)
|
||||
dec := codec.NewDecoder(&buf, structs.MsgpackHandle)
|
||||
|
||||
tt2 := NewTimeTable(time.Second, time.Minute)
|
||||
err = tt2.Deserialize(dec)
|
||||
|
||||
@@ -7,6 +7,8 @@ GOOS=linux go get $DEP_ARGS github.com/docker/docker/pkg/units
|
||||
GOOS=linux go get $DEP_ARGS github.com/docker/docker/pkg/mount
|
||||
GOOS=linux go get $DEP_ARGS github.com/opencontainers/runc/libcontainer/cgroups/fs
|
||||
GOOS=linux go get $DEP_ARGS github.com/opencontainers/runc/libcontainer/configs
|
||||
GOOS=linux go get $DEP_ARGS github.com/coreos/go-systemd/util
|
||||
GOOS=linux go get $DEP_ARGS github.com/coreos/go-systemd/dbus
|
||||
|
||||
# Get the rest of the deps
|
||||
DEPS=$(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
|
||||
|
||||
@@ -10,4 +10,4 @@ go build -o $TEMPDIR/nomad || exit 1
|
||||
|
||||
# Run the tests
|
||||
echo "--> Running tests"
|
||||
go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=40s
|
||||
go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=80s
|
||||
|
||||
@@ -5,9 +5,9 @@ var GitCommit string
|
||||
var GitDescribe string
|
||||
|
||||
// The main version number that is being run at the moment.
|
||||
const Version = "0.1.2"
|
||||
const Version = "0.2.0"
|
||||
|
||||
// A pre-release marker for the version. If this is "" (empty string)
|
||||
// then it means that it is a final release. Otherwise, this is a pre-release
|
||||
// such as "dev" (in development), "beta", "rc1", etc.
|
||||
const VersionPrerelease = ""
|
||||
const VersionPrerelease = "dev"
|
||||
|
||||
@@ -2,8 +2,8 @@ module CommandHelpers
|
||||
# Returns the markdown text for the general options usage.
|
||||
def general_options_usage()
|
||||
<<EOF
|
||||
* `-address=<addr>`: The address of the Nomad server. Overrides the "NOMAD_ADDR"
|
||||
environment variable if set. Defaults to "http://127.0.0.1:4646".
|
||||
* `-address=<addr>`: The address of the Nomad server. Overrides the `NOMAD_ADDR`
|
||||
environment variable if set. Defaults to `http://127.0.0.1:4646`.
|
||||
EOF
|
||||
end
|
||||
end
|
||||
|
||||
@@ -32,13 +32,13 @@ task "webservice" {
|
||||
|
||||
The following options are available for use in the job specification.
|
||||
|
||||
* `image` - (Required) The Docker image to run. The image may include a tag or
|
||||
* `image` - The Docker image to run. The image may include a tag or
|
||||
custom URL. By default it will be fetched from Docker Hub.
|
||||
|
||||
* `command` - (Optional) The command to run when starting the container.
|
||||
|
||||
* `args` - (Optional) Arguments to the optional `command`. If no `command` is
|
||||
present, `args` are ignored.
|
||||
* `args` - (Optional) A list of arguments to the optional `command`. If no
|
||||
`command` is present, `args` are ignored.
|
||||
|
||||
* `labels` - (Optional) A key/value map of labels to set to the containers on
|
||||
start.
|
||||
|
||||
@@ -20,15 +20,19 @@ scripts or other wrappers which provide higher level features.
|
||||
|
||||
The `exec` driver supports the following configuration in the job spec:
|
||||
|
||||
* `command` - (Required) The command to execute. Must be provided.
|
||||
* `artifact_source` – (Optional) Source location of an executable artifact. Must be accessible
|
||||
from the Nomad client. If you specify an `artifact_source` to be executed, you
|
||||
must reference it in the `command` as show in the examples below
|
||||
* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image.
|
||||
The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`,
|
||||
and the value is the computed checksum. If a checksum is supplied and does not
|
||||
match the downloaded artifact, the driver will fail to start
|
||||
* `args` - The argument list to the command, space seperated. Optional.
|
||||
* `command` - The command to execute. Must be provided.
|
||||
|
||||
* `artifact_source` – (Optional) Source location of an executable artifact. Must
|
||||
be accessible from the Nomad client. If you specify an `artifact_source` to be
|
||||
executed, you must reference it in the `command` as show in the examples below
|
||||
|
||||
* `checksum` - (Optional) The checksum type and value for the `artifact_source`
|
||||
image. The format is `type:value`, where type is any of `md5`, `sha1`,
|
||||
`sha256`, or `sha512`, and the value is the computed checksum. If a checksum
|
||||
is supplied and does not match the downloaded artifact, the driver will fail
|
||||
to start
|
||||
|
||||
* `args` - (Optional) A list of arguments to the `command`.
|
||||
|
||||
## Client Requirements
|
||||
|
||||
|
||||
@@ -18,17 +18,19 @@ HTTP from the Nomad client.
|
||||
|
||||
The `java` driver supports the following configuration in the job spec:
|
||||
|
||||
* `artifact_source` - **(Required)** The hosted location of the source Jar file. Must be accessible
|
||||
from the Nomad client
|
||||
* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image.
|
||||
The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`,
|
||||
and the value is the computed checksum. If a checksum is supplied and does not
|
||||
match the downloaded artifact, the driver will fail to start
|
||||
* `artifact_source` - The hosted location of the source Jar file. Must be
|
||||
accessible from the Nomad client
|
||||
|
||||
* `args` - **(Optional)** The argument list for the `java` command, space separated.
|
||||
* `checksum` - (Optional) The checksum type and value for the `artifact_source`
|
||||
image. The format is `type:value`, where type is any of `md5`, `sha1`,
|
||||
`sha256`, or `sha512`, and the value is the computed checksum. If a checksum
|
||||
is supplied and does not match the downloaded artifact, the driver will fail
|
||||
to start
|
||||
|
||||
* `jvm_options` - **(Optional)** JVM options to be passed while invoking java. These options
|
||||
are passed not validated in any way in Nomad.
|
||||
* `args` - (Optional) A list of arguments to the `java` command.
|
||||
|
||||
* `jvm_options` - (Optional) A list of JVM options to be passed while invoking
|
||||
java. These options are passed not validated in any way in Nomad.
|
||||
|
||||
## Client Requirements
|
||||
|
||||
|
||||
@@ -23,16 +23,19 @@ The `Qemu` driver can execute any regular `qemu` image (e.g. `qcow`, `img`,
|
||||
|
||||
The `Qemu` driver supports the following configuration in the job spec:
|
||||
|
||||
* `artifact_source` - **(Required)** The hosted location of the source Qemu image. Must be accessible
|
||||
* `artifact_source` - The hosted location of the source Qemu image. Must be accessible
|
||||
from the Nomad client, via HTTP.
|
||||
* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image.
|
||||
|
||||
* `checksum` - (Optional) The checksum type and value for the `artifact_source` image.
|
||||
The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`,
|
||||
and the value is the computed checksum. If a checksum is supplied and does not
|
||||
match the downloaded artifact, the driver will fail to start
|
||||
|
||||
* `accelerator` - (Optional) The type of accelerator to use in the invocation.
|
||||
If the host machine has `Qemu` installed with KVM support, users can specify
|
||||
`kvm` for the `accelerator`. Default is `tcg`
|
||||
* `port_map` - **(Optional)** A `map[string]int` that maps port labels to ports
|
||||
|
||||
* `port_map` - (Optional) A `map[string]int` that maps port labels to ports
|
||||
on the guest. This forwards the host port to the guest vm. For example,
|
||||
`port_map { db = 6539 }` would forward the host port with label `db` to the
|
||||
guest vm's port 6539.
|
||||
|
||||
@@ -18,15 +18,19 @@ As such, it should be used with extreme care and is disabled by default.
|
||||
|
||||
The `raw_exec` driver supports the following configuration in the job spec:
|
||||
|
||||
* `command` - (Required) The command to execute. Must be provided.
|
||||
* `artifact_source` – (Optional) Source location of an executable artifact. Must be accessible
|
||||
from the Nomad client. If you specify an `artifact_source` to be executed, you
|
||||
must reference it in the `command` as show in the examples below
|
||||
* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image.
|
||||
The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`,
|
||||
and the value is the computed checksum. If a checksum is supplied and does not
|
||||
match the downloaded artifact, the driver will fail to start
|
||||
* `args` - The argument list to the command, space seperated. Optional.
|
||||
* `command` - The command to execute. Must be provided.
|
||||
|
||||
* `artifact_source` – (Optional) Source location of an executable artifact. Must
|
||||
be accessible from the Nomad client. If you specify an `artifact_source` to be
|
||||
executed, you must reference it in the `command` as show in the examples below
|
||||
|
||||
* `checksum` - (Optional) The checksum type and value for the `artifact_source`
|
||||
image. The format is `type:value`, where type is any of `md5`, `sha1`,
|
||||
`sha256`, or `sha512`, and the value is the computed checksum. If a checksum
|
||||
is supplied and does not match the downloaded artifact, the driver will fail
|
||||
to start
|
||||
|
||||
* `args` - (Optional) A list of arguments to the `command`.
|
||||
|
||||
## Client Requirements
|
||||
|
||||
|
||||
@@ -20,13 +20,16 @@ being marked as experimental and should be used with care.
|
||||
|
||||
The `rkt` driver supports the following configuration in the job spec:
|
||||
|
||||
* `trust_prefix` - **(Optional)** The trust prefix to be passed to rkt. Must be reachable from
|
||||
the box running the nomad agent. If not specified, the image is run without
|
||||
verifying the image signature.
|
||||
* `image` - **(Required)** The image to run which may be specified by name,
|
||||
hash, ACI address or docker registry.
|
||||
* `command` - **(Optional**) A command to execute on the ACI.
|
||||
* `args` - **(Optional**) A string of args to pass into the image.
|
||||
* `image` - The image to run which may be specified by name, hash, ACI address
|
||||
or docker registry.
|
||||
|
||||
* `command` - (Optional) A command to execute on the ACI.
|
||||
|
||||
* `args` - (Optional) A list of arguments to the image.
|
||||
|
||||
* `trust_prefix` - (Optional) The trust prefix to be passed to rkt. Must be
|
||||
reachable from the box running the nomad agent. If not specified, the image is
|
||||
run without verifying the image signature.
|
||||
|
||||
## Task Directories
|
||||
|
||||
|
||||
@@ -64,8 +64,8 @@ clarify what is being discussed:
|
||||
Regions may contain multiple datacenters. Servers are assigned to regions and manage
|
||||
all state for the region and make scheduling decisions within that region. Requests that
|
||||
are made between regions are forwarded to the appropriate servers. As an example, you may
|
||||
have a "US" region with the "us-east-1" and "us-west-1" datacenters, connected to the
|
||||
"EU" region with the "eu-fr-1" and "eu-uk-1" datacenters.
|
||||
have a `US` region with the `us-east-1` and `us-west-1` datacenters, connected to the
|
||||
`EU` region with the `eu-fr-1` and `eu-uk-1` datacenters.
|
||||
|
||||
* **Bin Packing** - Bin Packing is the process of filling bins with items in a way that
|
||||
maximizes the utilization of bins. This extends to Nomad, where the clients are "bins"
|
||||
|
||||
@@ -47,6 +47,15 @@ job "my-service" {
|
||||
config {
|
||||
image = "hashicorp/web-frontend"
|
||||
}
|
||||
service {
|
||||
port = "http"
|
||||
check {
|
||||
type = "http"
|
||||
path = "/health"
|
||||
interval = "10s"
|
||||
timeout = "2s"
|
||||
}
|
||||
}
|
||||
env {
|
||||
DB_HOST = "db01.example.com"
|
||||
DB_USER = "web"
|
||||
@@ -57,10 +66,13 @@ job "my-service" {
|
||||
memory = 128
|
||||
network {
|
||||
mbits = 100
|
||||
dynamic_ports = [
|
||||
"http",
|
||||
"https",
|
||||
]
|
||||
# Request for a dynamic port
|
||||
port "http" {
|
||||
}
|
||||
# Request for a static port
|
||||
port "https" {
|
||||
static = 443
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,7 +181,7 @@ The `task` object supports the following keys:
|
||||
|
||||
* `driver` - Specifies the task driver that should be used to run the
|
||||
task. See the [driver documentation](/docs/drivers/index.html) for what
|
||||
is available. Examples include "docker", "qemu", "java", and "exec".
|
||||
is available. Examples include `docker`, `qemu`, `java`, and `exec`.
|
||||
|
||||
* `constraint` - This can be provided multiple times to define additional
|
||||
constraints. See the constraint reference for more details.
|
||||
@@ -178,6 +190,11 @@ The `task` object supports the following keys:
|
||||
to start the task. The details of configurations are specific to
|
||||
each driver.
|
||||
|
||||
* `service` - Nomad integrates with Consul for Service Discovery. A service
|
||||
block represents a routable and discoverable service on the network. Nomad
|
||||
automatically registers when a Task is started and de-registers it when the
|
||||
Task transitons to the DEAD state. To learn more about Services please visit [here](/docs/jobspec/servicediscovery.html.md)
|
||||
|
||||
* `env` - A map of key/value representing environment variables that
|
||||
will be passed along to the running process.
|
||||
|
||||
@@ -223,14 +240,14 @@ The `restart` object supports the following keys:
|
||||
number of restarts allowed in an `interval` before a restart delay is added.
|
||||
|
||||
* `interval` - `interval` is only valid on non-batch jobs and is a time duration
|
||||
that can be specified using the "s", "m", and "h" suffixes, such as "30s".
|
||||
that can be specified using the `s`, `m`, and `h` suffixes, such as `30s`.
|
||||
The `interval` begins when the first task starts and ensures that only
|
||||
`attempts` number of restarts happens within it. If more than `attempts`
|
||||
number of failures happen, the restart is delayed till after the `interval`,
|
||||
which is then reset.
|
||||
|
||||
* `delay` - A duration to wait before restarting a task. It is specified as a
|
||||
time duration using the "s", "m", and "h" suffixes, such as "30s".
|
||||
time duration using the `s`, `m`, and `h` suffixes, such as `30s`.
|
||||
|
||||
The default `batch` restart policy is:
|
||||
|
||||
@@ -266,7 +283,7 @@ The `constraint` object supports the following keys:
|
||||
This can be a literal value or another attribute.
|
||||
|
||||
* `version` - Specifies a version constraint against the attribute.
|
||||
This sets the operator to "version" and the `value` to what is
|
||||
This sets the operator to `version` and the `value` to what is
|
||||
specified. This supports a comma seperated list of constraints,
|
||||
including the pessimistic operator. See the
|
||||
[go-version](https://github.com/hashicorp/go-version) repository
|
||||
@@ -326,7 +343,7 @@ Below is a table documenting common node attributes:
|
||||
</tr>
|
||||
<tr>
|
||||
<td>arch</td>
|
||||
<td>CPU architecture of the client. Examples: "amd64", "386"</td>
|
||||
<td>CPU architecture of the client. Examples: `amd64`, `386`</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>consul.datacenter</td>
|
||||
@@ -346,11 +363,11 @@ Below is a table documenting common node attributes:
|
||||
</tr>
|
||||
<tr>
|
||||
<td>kernel.name</td>
|
||||
<td>Kernel of the client. Examples: "linux", "darwin"</td>
|
||||
<td>Kernel of the client. Examples: `linux`, `darwin`</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>kernel.version</td>
|
||||
<td>Version of the client kernel. Examples: "3.19.0-25-generic", "15.0.0"</td>
|
||||
<td>Version of the client kernel. Examples: `3.19.0-25-generic`, `15.0.0`</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>platform.aws.ami-id</td>
|
||||
@@ -362,7 +379,7 @@ Below is a table documenting common node attributes:
|
||||
</tr>
|
||||
<tr>
|
||||
<td>os.name</td>
|
||||
<td>Operating system of the client. Examples: "ubuntu", "windows", "darwin"</td>
|
||||
<td>Operating system of the client. Examples: `ubuntu`, `windows`, `darwin`</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>os.version</td>
|
||||
|
||||
120
website/source/docs/jobspec/servicediscovery.html.md
Normal file
120
website/source/docs/jobspec/servicediscovery.html.md
Normal file
@@ -0,0 +1,120 @@
|
||||
---
|
||||
layout: "docs"
|
||||
page_title: "Service Discovery in Nomad"
|
||||
sidebar_current: "docs-jobspec-service-discovery"
|
||||
description: |-
|
||||
Learn how to add service discovery to jobs
|
||||
---
|
||||
|
||||
# Service Discovery
|
||||
|
||||
Nomad schedules workloads of various types across a cluster of generic hosts.
|
||||
Because of this, placement is not known in advance and you will need to use
|
||||
service discovery to connect tasks to other services deployed across your
|
||||
cluster. Nomad integrates with [Consul](https://consul.io) to provide service
|
||||
discovery and monitoring.
|
||||
|
||||
Note that in order to use Consul with Nomad, you will need to configure and
|
||||
install Consul on your nodes alongside Nomad, or schedule it as a system job.
|
||||
Nomad does not currently run Consul for you.
|
||||
|
||||
## Configuration
|
||||
|
||||
* `consul.address`: This is a Nomad client configuration which can be used to
|
||||
override the default Consul Agent HTTP port that Nomad uses to connect to
|
||||
Consul. The default for this is `127.0.0.1:8500`.
|
||||
|
||||
## Service Definition Syntax
|
||||
|
||||
The service blocks in a Task definition defines a service which Nomad will
|
||||
register with Consul. Multiple Service blocks are allowed in a Task definition,
|
||||
which allow registering multiple services for a task that exposes multiple
|
||||
ports.
|
||||
|
||||
### Example
|
||||
|
||||
A brief example of a service definition in a Task
|
||||
|
||||
```
|
||||
group "database" {
|
||||
task "mysql" {
|
||||
driver = "docker"
|
||||
service {
|
||||
tags = ["master", "mysql"]
|
||||
port = "db"
|
||||
check {
|
||||
type = "tcp"
|
||||
delay = "10s"
|
||||
timeout = "2s"
|
||||
}
|
||||
}
|
||||
resources {
|
||||
cpu = 500
|
||||
memory = 1024
|
||||
network {
|
||||
mbits = 10
|
||||
port "db" {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
* `name`: Nomad automatically determines the name of a Task. By default the
|
||||
name of a service is $(job-name)-$(task-group)-$(task-name). Users can
|
||||
explicitly name the service by specifying this option. If multiple services
|
||||
are defined for a Task then only one task can have the default name, all the
|
||||
services have to be explicitly named. Nomad will add the prefix ```$(job-name
|
||||
)-${task-group}-${task-name}``` prefix to each user defined name.
|
||||
|
||||
* `tags`: A list of tags associated with this Service.
|
||||
|
||||
* `port`: The port indicates the port associated with the Service. Users are
|
||||
required to specify a valid port label here which they have defined in the
|
||||
resources block. This could be a label to either a dynamic or a static port.
|
||||
If an incorrect port label is specified, Nomad doesn't register the service
|
||||
with Consul.
|
||||
|
||||
* `check`: A check block defines a health check associated with the service.
|
||||
Multiple check blocks are allowed for a service. Nomad currently supports
|
||||
only the `http` and `tcp` Consul Checks.
|
||||
|
||||
### Check Syntax
|
||||
|
||||
* `type`: This indicates the check types supported by Nomad. Valid options are
|
||||
currently `http` and `tcp`. In the future Nomad will add support for more
|
||||
Consul checks.
|
||||
|
||||
* `delay`: This indicates the frequency of the health checks that Consul with
|
||||
perform.
|
||||
|
||||
* `timeout`: This indicates how long Consul will wait for a health check query
|
||||
to succeed.
|
||||
|
||||
* `path`: The path of the http endpoint which Consul will query to query the
|
||||
health of a service if the type of the check is `http`. Nomad will add the ip
|
||||
of the service and the port, users are only required to add the relative url
|
||||
of the health check endpoint.
|
||||
|
||||
* `protocol`: This indicates the protocol for the http checks. Valid options
|
||||
are `http` and `https`. We default it to `http`
|
||||
|
||||
## Assumptions
|
||||
|
||||
* Consul 0.6 is needed for using the TCP checks.
|
||||
|
||||
* The Service Discovery feature in Nomad depends on Operators making sure that
|
||||
the Nomad client can reach the consul agent.
|
||||
|
||||
* Nomad assumes that it controls the life cycle of all the externally
|
||||
discoverable services running on a host.
|
||||
|
||||
* Tasks running inside Nomad also needs to reach out to the Consul agent if
|
||||
they want to use any of the Consul APIs. Ex: A task running inside a docker
|
||||
container in the bridge mode won't be able to talk to a Consul Agent running
|
||||
on the loopback interface of the host since the container in the bridge mode
|
||||
has it's own network interface and doesn't see interfaces on the global
|
||||
network namespace of the host. There are a couple of ways to solve this, one
|
||||
way is to run the container in the host networking mode, or make the Consul
|
||||
agent listen on an interface on the network namespace of the container.
|
||||
@@ -41,8 +41,8 @@
|
||||
<li<%= sidebar_current("docs-jobspec-schedulers") %>>
|
||||
<a href="/docs/jobspec/schedulers.html">Scheduler Types</a>
|
||||
</li>
|
||||
<li<%= sidebar_current("docs-jobspec-restarts") %>>
|
||||
<a href="/docs/jobspec/restarts.html">Restart Policies</a>
|
||||
<li<%= sidebar_current("docs-jobspec-service-discovery") %>>
|
||||
<a href="/docs/jobspec/servicediscovery.html">Service Discovery</a>
|
||||
</li>
|
||||
<li<%= sidebar_current("docs-jobspec-networking") %>>
|
||||
<a href="/docs/jobspec/networking.html">Networking</a>
|
||||
|
||||
Reference in New Issue
Block a user