Bring up-to-date with master

This commit is contained in:
Ivo Verberk
2015-12-19 21:55:10 +01:00
62 changed files with 1208 additions and 510 deletions

View File

@@ -1,15 +1,36 @@
## 0.3.0
IMPROVEMENTS:
* Server join/retry-join command line and config options [GH-527]
* Enable raw_exec driver in dev mode [GH-558]
## 0.2.3 (December 17, 2015)
BUG FIXES:
* Shutdown a task now sends the interrupt signal first to the process before
forcefully killing it. [GH-543]
* Docker driver no longer leaks unix domain socket connections [GH-556]
* client: Fixes for user lookup to support CoreOS [GH-591]
* discovery: Fixes for service registration when multiple allocations are bin
packed on a node [GH-583]
* discovery: De-Registering Tasks while Nomad sleeps before failed tasks are
restarted.
* discovery: Using a random prefix for nomad managed services [GH-579]
* configuration: Sort configuration files [GH-588]
* core: Task States not being properly updated [GH-600]
* cli: RetryInterval was not being applied properly [GH-601]
## 0.2.1
## 0.2.2 (December 11, 2015)
IMPROVEMENTS:
* core: Enable `raw_exec` driver in dev mode [GH-558]
* cli: Server join/retry-join command line and config options [GH-527]
* cli: Nomad reports which config files are loaded at start time, or if none
are loaded [GH-536], [GH-553]
BUG FIXES:
* core: Send syslog to `LOCAL0` by default as previously documented [GH-547]
* consul: Nomad is less noisy when Consul is not running [GH-567]
* consul: Nomad only deregisters services that it created [GH-568]
* driver/docker: Docker driver no longer leaks unix domain socket connections
[GH-556]
* driver/exec: Shutdown a task now sends the interrupt signal first to the
process before forcefully killing it. [GH-543]
* fingerprint/network: Now correctly detects interfaces on Windows [GH-382]
* client: remove all calls to default logger [GH-570]
## 0.2.1 (November 28, 2015)
IMPROVEMENTS:

View File

@@ -48,8 +48,12 @@ vet:
@echo "--> Running go tool vet $(VETARGS) ."
@go tool vet $(VETARGS) . ; if [ $$? -eq 1 ]; then \
echo ""; \
echo "Vet found suspicious constructs. Please check the reported constructs"; \
echo "and fix them if necessary before submitting the code for reviewal."; \
echo "[LINT] Vet found suspicious constructs. Please check the reported constructs"; \
echo "and fix them if necessary before submitting the code for review."; \
fi
@git grep -n `echo "log"".Print"` ; if [ $$? -eq 0 ]; then \
echo "[LINT] Found "log"".Printf" calls. These should use Nomad's logger instead."; \
fi
web:

8
Vagrantfile vendored
View File

@@ -7,7 +7,7 @@ VAGRANTFILE_API_VERSION = "2"
$script = <<SCRIPT
# Install Prereq Packages
sudo apt-get update
sudo apt-get install -y build-essential curl git-core mercurial bzr libpcre3-dev pkg-config zip default-jre qemu
sudo apt-get install -y build-essential curl git-core mercurial bzr libpcre3-dev pkg-config zip default-jre qemu libc6-dev-i386 silversearcher-ag jq htop vim unzip
# Setup go, for development of Nomad
SRCROOT="/opt/go"
@@ -18,8 +18,8 @@ ARCH=`uname -m | sed 's|i686|386|' | sed 's|x86_64|amd64|'`
# Install Go
cd /tmp
wget -q https://storage.googleapis.com/golang/go1.5.1.linux-${ARCH}.tar.gz
tar -xf go1.5.1.linux-${ARCH}.tar.gz
wget -q https://storage.googleapis.com/golang/go1.5.2.linux-${ARCH}.tar.gz
tar -xf go1.5.2.linux-${ARCH}.tar.gz
sudo mv go $SRCROOT
sudo chmod 775 $SRCROOT
sudo chown vagrant:vagrant $SRCROOT
@@ -42,7 +42,7 @@ source /etc/profile.d/gopath.sh
echo Fetching Consul...
cd /tmp/
wget https://releases.hashicorp.com/consul/0.6.0-rc2/consul_0.6.0-rc2_linux_amd64.zip -O consul.zip
wget https://releases.hashicorp.com/consul/0.6.0/consul_0.6.0_linux_amd64.zip -O consul.zip
echo Installing Consul...
unzip consul.zip
sudo chmod +x consul

View File

@@ -47,6 +47,7 @@ type Allocation struct {
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string

View File

@@ -69,7 +69,6 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
RestartPolicy: NewRestartPolicy(),
Tasks: []*Task{
&Task{
Name: "task1",

View File

@@ -7,17 +7,11 @@ import (
// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Interval time.Duration
Attempts int
Delay time.Duration
}
func NewRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Attempts: 10,
Interval: 3 * time.Minute,
Delay: 5 * time.Second,
}
Interval time.Duration
Attempts int
Delay time.Duration
RestartOnSuccess bool
Mode string
}
// The ServiceCheck data model represents the consul health check that
@@ -54,11 +48,9 @@ type TaskGroup struct {
// NewTaskGroup creates a new TaskGroup.
func NewTaskGroup(name string, count int) *TaskGroup {
restartPolicy := NewRestartPolicy()
return &TaskGroup{
Name: name,
Count: count,
RestartPolicy: restartPolicy,
Name: name,
Count: count,
}
}

View File

@@ -8,9 +8,8 @@ import (
func TestTaskGroup_NewTaskGroup(t *testing.T) {
grp := NewTaskGroup("grp1", 2)
expect := &TaskGroup{
Name: "grp1",
Count: 2,
RestartPolicy: NewRestartPolicy(),
Name: "grp1",
Count: 2,
}
if !reflect.DeepEqual(grp, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp)

View File

@@ -17,9 +17,6 @@ func assertWriteMeta(t *testing.T, wm *WriteMeta) {
if wm.LastIndex == 0 {
t.Fatalf("bad index: %d", wm.LastIndex)
}
if wm.RequestTime == 0 {
t.Fatalf("bad request time: %d", wm.RequestTime)
}
}
func testJob() *Job {

View File

@@ -110,9 +110,9 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}
task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[name] = tr
@@ -322,9 +322,9 @@ func (r *AllocRunner) Run() {
// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()

View File

@@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false)
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
@@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}
// Create a new alloc runner
consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false)
consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()

View File

@@ -5,7 +5,6 @@ package allocdir
import (
"fmt"
"github.com/hashicorp/nomad/helper/user-lookup"
"os"
"os/user"
"strconv"
@@ -27,7 +26,7 @@ func (d *AllocDir) dropDirPermissions(path string) error {
return nil
}
u, err := userlookup.Lookup("nobody")
u, err := user.Lookup("nobody")
if err != nil {
return err
}

View File

@@ -157,7 +157,16 @@ func (c *Client) setupConsulService() error {
auth := c.config.Read("consul.auth")
enableSSL := c.config.ReadBoolDefault("consul.ssl", false)
verifySSL := c.config.ReadBoolDefault("consul.verifyssl", true)
if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL); err != nil {
consulServiceCfg := &consulServiceConfig{
logger: c.logger,
consulAddr: addr,
token: token,
auth: auth,
enableSSL: enableSSL,
verifySSL: verifySSL,
node: c.config.Node,
}
if consulService, err = NewConsulService(consulServiceCfg); err != nil {
return err
}
c.consulService = consulService

View File

@@ -47,8 +47,8 @@ func (a *consulApiClient) ServiceRegister(service *consul.AgentServiceRegistrati
return a.client.Agent().ServiceRegister(service)
}
func (a *consulApiClient) ServiceDeregister(serviceId string) error {
return a.client.Agent().ServiceDeregister(serviceId)
func (a *consulApiClient) ServiceDeregister(serviceID string) error {
return a.client.Agent().ServiceDeregister(serviceID)
}
func (a *consulApiClient) Services() (map[string]*consul.AgentService, error) {
@@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
// trackedTask is a Task that we are tracking for changes in service and check
// definitions and keep them sycned with Consul Agent
type trackedTask struct {
allocID string
task *structs.Task
task *structs.Task
alloc *structs.Allocation
}
// ConsulService is the service which tracks tasks and syncs the services and
@@ -72,31 +72,41 @@ type ConsulService struct {
client consulApi
logger *log.Logger
shutdownCh chan struct{}
node *structs.Node
trackedTasks map[string]*trackedTask
serviceStates map[string]string
trackedTskLock sync.Mutex
}
type consulServiceConfig struct {
logger *log.Logger
consulAddr string
token string
auth string
enableSSL bool
verifySSL bool
node *structs.Node
}
// A factory method to create new consul service
func NewConsulService(logger *log.Logger, consulAddr string, token string,
auth string, enableSSL bool, verifySSL bool) (*ConsulService, error) {
func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
var err error
var c *consul.Client
cfg := consul.DefaultConfig()
cfg.Address = consulAddr
if token != "" {
cfg.Token = token
cfg.Address = config.consulAddr
if config.token != "" {
cfg.Token = config.token
}
if auth != "" {
if config.auth != "" {
var username, password string
if strings.Contains(auth, ":") {
split := strings.SplitN(auth, ":", 2)
if strings.Contains(config.auth, ":") {
split := strings.SplitN(config.auth, ":", 2)
username = split[0]
password = split[1]
} else {
username = auth
username = config.auth
}
cfg.HttpAuth = &consul.HttpBasicAuth{
@@ -104,10 +114,10 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string,
Password: password,
}
}
if enableSSL {
if config.enableSSL {
cfg.Scheme = "https"
}
if enableSSL && !verifySSL {
if config.enableSSL && !config.verifySSL {
cfg.HttpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
@@ -121,7 +131,8 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string,
consulService := ConsulService{
client: &consulApiClient{client: c},
logger: logger,
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
shutdownCh: make(chan struct{}),
@@ -132,15 +143,15 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string,
// Register starts tracking a task for changes to it's services and tasks and
// adds/removes services and checks associated with it.
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
tt := &trackedTask{allocID: allocID, task: task}
c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
c.trackedTskLock.Unlock()
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, allocID); err != nil {
if err := c.registerService(service, task, alloc); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
@@ -150,18 +161,19 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {
// Deregister stops tracking a task for changes to it's services and checks and
// removes all the services and checks associated with the Task
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name))
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
c.trackedTskLock.Unlock()
for _, service := range task.Services {
if service.Id == "" {
serviceID := alloc.Services[service.Name]
if serviceID == "" {
continue
}
c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name)
if err := c.deregisterService(service.Id); err != nil {
c.logger.Printf("[DEBUG] consul: error in deregistering service %v from consul", service.Name)
if err := c.deregisterService(serviceID); err != nil {
c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name)
mErr.Errors = append(mErr.Errors, err)
}
}
@@ -193,9 +205,18 @@ func (c *ConsulService) SyncWithConsul() {
// services which are no longer present in tasks
func (c *ConsulService) performSync() {
// Get the list of the services and that Consul knows about
consulServices, _ := c.client.Services()
consulChecks, _ := c.client.Checks()
delete(consulServices, "consul")
srvcs, err := c.client.Services()
if err != nil {
return
}
chks, err := c.client.Checks()
if err != nil {
return
}
// Filter the services and checks that isn't managed by consul
consulServices := c.filterConsulServices(srvcs)
consulChecks := c.filterConsulChecks(chks)
knownChecks := make(map[string]struct{})
knownServices := make(map[string]struct{})
@@ -203,28 +224,30 @@ func (c *ConsulService) performSync() {
// Add services and checks which Consul doesn't know about
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
serviceID := trackedTask.alloc.Services[service.Name]
// Add new services which Consul agent isn't aware of
knownServices[service.Id] = struct{}{}
if _, ok := consulServices[service.Id]; !ok {
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
knownServices[serviceID] = struct{}{}
if _, ok := consulServices[serviceID]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}
// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[service.Id] {
c.logger.Printf("[INFO] consul: reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
if service.Hash() != c.serviceStates[serviceID] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}
// Add new checks that Consul isn't aware of
for _, check := range service.Checks {
knownChecks[check.Id] = struct{}{}
if _, ok := consulChecks[check.Id]; !ok {
checkID := check.Hash(serviceID)
knownChecks[checkID] = struct{}{}
if _, ok := consulChecks[checkID]; !ok {
host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel)
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceID, check, host, port)
c.registerCheck(cr)
}
}
@@ -232,9 +255,9 @@ func (c *ConsulService) performSync() {
}
// Remove services from the service tracker which no longer exists
for serviceId := range c.serviceStates {
if _, ok := knownServices[serviceId]; !ok {
delete(c.serviceStates, serviceId)
for serviceID := range c.serviceStates {
if _, ok := knownServices[serviceID]; !ok {
delete(c.serviceStates, serviceID)
}
}
@@ -242,7 +265,7 @@ func (c *ConsulService) performSync() {
for _, consulService := range consulServices {
if _, ok := knownServices[consulService.ID]; !ok {
delete(c.serviceStates, consulService.ID)
c.logger.Printf("[INFO] consul: deregistering service %v with consul", consulService.Service)
c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service)
c.deregisterService(consulService.ID)
}
}
@@ -256,16 +279,17 @@ func (c *ConsulService) performSync() {
}
// registerService registers a Service with Consul
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error {
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
host, port := task.FindHostAndPortFor(service.PortLabel)
if host == "" || port == 0 {
return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name)
}
c.serviceStates[service.Id] = service.Hash()
serviceID := alloc.Services[service.Name]
c.serviceStates[serviceID] = service.Hash()
asr := &consul.AgentServiceRegistration{
ID: service.Id,
ID: serviceID,
Name: service.Name,
Tags: service.Tags,
Port: port,
@@ -273,13 +297,13 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
}
if err := c.client.ServiceRegister(asr); err != nil {
c.logger.Printf("[DEBUG] consul: error while registering service %v with consul: %v", service.Name, err)
c.printLogMessage("[DEBUG] consul: error while registering service %v with consul: %v", service.Name, err)
mErr.Errors = append(mErr.Errors, err)
}
for _, check := range service.Checks {
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceID, check, host, port)
if err := c.registerCheck(cr); err != nil {
c.logger.Printf("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err)
c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
}
@@ -289,31 +313,32 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
// registerCheck registers a check with Consul
func (c *ConsulService) registerCheck(check *consul.AgentCheckRegistration) error {
c.logger.Printf("[INFO] consul: registering Check with ID: %v for service: %v", check.ID, check.ServiceID)
c.printLogMessage("[INFO] consul: registering check with ID: %s for service: %s", check.ID, check.ServiceID)
return c.client.CheckRegister(check)
}
// deregisterCheck de-registers a check with a specific ID from Consul
func (c *ConsulService) deregisterCheck(checkID string) error {
c.logger.Printf("[INFO] consul: removing check with ID: %v", checkID)
c.printLogMessage("[INFO] consul: removing check with ID: %v", checkID)
return c.client.CheckDeregister(checkID)
}
// deregisterService de-registers a Service with a specific id from Consul
func (c *ConsulService) deregisterService(serviceId string) error {
delete(c.serviceStates, serviceId)
if err := c.client.ServiceDeregister(serviceId); err != nil {
func (c *ConsulService) deregisterService(serviceID string) error {
delete(c.serviceStates, serviceID)
if err := c.client.ServiceDeregister(serviceID); err != nil {
return err
}
return nil
}
// makeCheck creates a Consul Check Registration struct
func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
func (c *ConsulService) makeCheck(serviceID string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
checkID := check.Hash(serviceID)
cr := &consul.AgentCheckRegistration{
ID: check.Id,
ID: checkID,
Name: check.Name,
ServiceID: service.Id,
ServiceID: serviceID,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()
@@ -336,3 +361,38 @@ func (c *ConsulService) makeCheck(service *structs.Service, check *structs.Servi
}
return cr
}
// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService {
nomadServices := make(map[string]*consul.AgentService)
delete(srvcs, "consul")
for _, srv := range srvcs {
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) {
nomadServices[srv.ID] = srv
}
}
return nomadServices
}
// filterConsulChecks prunes out all the consul checks which do not have
// services with id prefixed with noamd-
func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
nomadChecks := make(map[string]*consul.AgentCheck)
for _, chk := range chks {
if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) {
nomadChecks[chk.CheckID] = chk
}
}
return nomadChecks
}
// printLogMessage prints log messages only when the node attributes have consul
// related information
func (c *ConsulService) printLogMessage(message string, v ...interface{}) {
if _, ok := c.node.Attributes["consul.version"]; ok {
c.logger.Println(fmt.Sprintf(message, v...))
}
}

View File

@@ -1,10 +1,13 @@
package client
import (
"fmt"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"log"
"os"
"reflect"
"testing"
"time"
)
@@ -31,7 +34,7 @@ func (a *mockConsulApiClient) ServiceRegister(service *consul.AgentServiceRegist
return nil
}
func (a *mockConsulApiClient) ServiceDeregister(serviceId string) error {
func (a *mockConsulApiClient) ServiceDeregister(serviceID string) error {
a.serviceDeregisterCallCount += 1
return nil
}
@@ -46,7 +49,7 @@ func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
func newConsulService() *ConsulService {
logger := log.New(os.Stdout, "logger: ", log.Lshortfile)
c, _ := NewConsulService(logger, "", "", "", false, false)
c, _ := NewConsulService(&consulServiceConfig{logger, "", "", "", false, false, &structs.Node{}})
c.client = &mockConsulApiClient{}
return c
}
@@ -69,7 +72,6 @@ func newTask() *structs.Task {
func TestConsul_MakeChecks(t *testing.T) {
service := &structs.Service{
Id: "Foo",
Name: "Bar",
Checks: []*structs.ServiceCheck{
{
@@ -94,10 +96,11 @@ func TestConsul_MakeChecks(t *testing.T) {
}
c := newConsulService()
serviceID := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix)
check1 := c.makeCheck(service, service.Checks[0], "10.10.0.1", 8090)
check2 := c.makeCheck(service, service.Checks[1], "10.10.0.1", 8090)
check3 := c.makeCheck(service, service.Checks[2], "10.10.0.1", 8090)
check1 := c.makeCheck(serviceID, service.Checks[0], "10.10.0.1", 8090)
check2 := c.makeCheck(serviceID, service.Checks[1], "10.10.0.1", 8090)
check3 := c.makeCheck(serviceID, service.Checks[2], "10.10.0.1", 8090)
if check1.HTTP != "http://10.10.0.1:8090/foo/bar" {
t.Fatalf("Invalid http url for check: %v", check1.HTTP)
@@ -141,7 +144,6 @@ func TestConsul_InvalidPortLabelForService(t *testing.T) {
},
}
service := &structs.Service{
Id: "service-id",
Name: "foo",
Tags: []string{"a", "b"},
PortLabel: "https",
@@ -149,7 +151,7 @@ func TestConsul_InvalidPortLabelForService(t *testing.T) {
}
c := newConsulService()
if err := c.registerService(service, task, "allocid"); err == nil {
if err := c.registerService(service, task, mock.Alloc()); err == nil {
t.Fatalf("Service should be invalid")
}
}
@@ -174,7 +176,7 @@ func TestConsul_Services_Deleted_From_Task(t *testing.T) {
},
},
}
c.Register(&task, "1")
c.Register(&task, mock.Alloc())
if len(c.serviceStates) != 1 {
t.Fatalf("Expected tracked services: %v, Actual: %v", 1, len(c.serviceStates))
}
@@ -190,13 +192,14 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
c := newConsulService()
task := newTask()
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
alloc := mock.Alloc()
serviceID := alloc.Services[s1.Name]
c.Register(task, alloc)
s1.Tags = []string{"frontcache"}
@@ -206,8 +209,8 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
t.Fatal("We should be tracking one service")
}
if c.serviceStates[s1.Id] != s1.Hash() {
t.Fatalf("Hash is %v, expected %v", c.serviceStates[s1.Id], s1.Hash())
if c.serviceStates[serviceID] != s1.Hash() {
t.Fatalf("Hash is %v, expected %v", c.serviceStates[serviceID], s1.Hash())
}
}
@@ -218,14 +221,13 @@ func TestConsul_AddCheck_To_Service(t *testing.T) {
task := newTask()
var checks []*structs.ServiceCheck
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
Checks: checks,
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
c.Register(task, mock.Alloc())
check1 := structs.ServiceCheck{
Name: "alive",
@@ -249,14 +251,13 @@ func TestConsul_ModifyCheck(t *testing.T) {
task := newTask()
var checks []*structs.ServiceCheck
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
Checks: checks,
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
c.Register(task, mock.Alloc())
check1 := structs.ServiceCheck{
Name: "alive",
@@ -278,3 +279,79 @@ func TestConsul_ModifyCheck(t *testing.T) {
t.Fatalf("Expected number of check registrations: %v, Actual: %v", 2, apiClient.checkRegisterCallCount)
}
}
func TestConsul_FilterNomadServicesAndChecks(t *testing.T) {
c := newConsulService()
srvs := map[string]*consul.AgentService{
"foo-bar": {
ID: "foo-bar",
Service: "http-frontend",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
"nomad-registered-service-2121212": {
ID: "nomad-registered-service-2121212",
Service: "identity-service",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
}
expSrvcs := map[string]*consul.AgentService{
"nomad-registered-service-2121212": {
ID: "nomad-registered-service-2121212",
Service: "identity-service",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
}
nomadServices := c.filterConsulServices(srvs)
if !reflect.DeepEqual(expSrvcs, nomadServices) {
t.Fatalf("Expected: %v, Actual: %v", expSrvcs, nomadServices)
}
nomadServices = c.filterConsulServices(nil)
if len(nomadServices) != 0 {
t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(nomadServices))
}
chks := map[string]*consul.AgentCheck{
"foo-bar-chk": {
CheckID: "foo-bar-chk",
ServiceID: "foo-bar",
Name: "alive",
},
"212121212": {
CheckID: "212121212",
ServiceID: "nomad-registered-service-2121212",
Name: "ping",
},
}
expChks := map[string]*consul.AgentCheck{
"212121212": {
CheckID: "212121212",
ServiceID: "nomad-registered-service-2121212",
Name: "ping",
},
}
nomadChecks := c.filterConsulChecks(chks)
if !reflect.DeepEqual(expChks, nomadChecks) {
t.Fatalf("Expected: %v, Actual: %v", expChks, nomadChecks)
}
if len(nomadChecks) != 1 {
t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks))
}
nomadChecks = c.filterConsulChecks(nil)
if len(nomadChecks) != 0 {
t.Fatalf("Expected number of checks: %v, Actual: %v", 0, len(nomadChecks))
}
}

View File

@@ -455,27 +455,27 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
},
})
if err != nil {
log.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name)
d.logger.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name)
return nil, fmt.Errorf("Failed to query list of containers: %s", err)
}
if len(containers) != 1 {
log.Printf("[ERR] driver.docker: failed to get id for container %s", config.Name)
d.logger.Printf("[ERR] driver.docker: failed to get id for container %s", config.Name)
return nil, fmt.Errorf("Failed to get id for container %s", config.Name)
}
log.Printf("[INFO] driver.docker: a container with the name %s already exists; will attempt to purge and re-create", config.Name)
d.logger.Printf("[INFO] driver.docker: a container with the name %s already exists; will attempt to purge and re-create", config.Name)
err = client.RemoveContainer(docker.RemoveContainerOptions{
ID: containers[0].ID,
})
if err != nil {
log.Printf("[ERR] driver.docker: failed to purge container %s", config.Name)
d.logger.Printf("[ERR] driver.docker: failed to purge container %s", config.Name)
return nil, fmt.Errorf("Failed to purge container %s: %s", config.Name, err)
}
log.Printf("[INFO] driver.docker: purged container %s", config.Name)
d.logger.Printf("[INFO] driver.docker: purged container %s", config.Name)
container, err = client.CreateContainer(config)
if err != nil {
log.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name)
d.logger.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name)
return nil, fmt.Errorf("Failed to re-create container %s; aborting", config.Name)
}
} else {
@@ -593,10 +593,10 @@ func (h *DockerHandle) Kill() error {
// Stop the container
err := h.client.StopContainer(h.containerID, 5)
if err != nil {
log.Printf("[ERR] driver.docker: failed to stop container %s", h.containerID)
h.logger.Printf("[ERR] driver.docker: failed to stop container %s", h.containerID)
return fmt.Errorf("Failed to stop container %s: %s", h.containerID, err)
}
log.Printf("[INFO] driver.docker: stopped container %s", h.containerID)
h.logger.Printf("[INFO] driver.docker: stopped container %s", h.containerID)
// Cleanup container
if h.cleanupContainer {
@@ -605,10 +605,10 @@ func (h *DockerHandle) Kill() error {
RemoveVolumes: true,
})
if err != nil {
log.Printf("[ERR] driver.docker: failed to remove container %s", h.containerID)
h.logger.Printf("[ERR] driver.docker: failed to remove container %s", h.containerID)
return fmt.Errorf("Failed to remove container %s: %s", h.containerID, err)
}
log.Printf("[INFO] driver.docker: removed container %s", h.containerID)
h.logger.Printf("[INFO] driver.docker: removed container %s", h.containerID)
}
// Cleanup image. This operation may fail if the image is in use by another
@@ -624,17 +624,17 @@ func (h *DockerHandle) Kill() error {
},
})
if err != nil {
log.Printf("[ERR] driver.docker: failed to query list of containers matching image:%s", h.imageID)
h.logger.Printf("[ERR] driver.docker: failed to query list of containers matching image:%s", h.imageID)
return fmt.Errorf("Failed to query list of containers: %s", err)
}
inUse := len(containers)
if inUse > 0 {
log.Printf("[INFO] driver.docker: image %s is still in use by %d container(s)", h.imageID, inUse)
h.logger.Printf("[INFO] driver.docker: image %s is still in use by %d container(s)", h.imageID, inUse)
} else {
return fmt.Errorf("Failed to remove image %s", h.imageID)
}
} else {
log.Printf("[INFO] driver.docker: removed image %s", h.imageID)
h.logger.Printf("[INFO] driver.docker: removed image %s", h.imageID)
}
}
return nil

View File

@@ -24,7 +24,6 @@ import (
"github.com/hashicorp/nomad/client/driver/spawn"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/helper/args"
"github.com/hashicorp/nomad/helper/user-lookup"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -124,7 +123,7 @@ func (e *LinuxExecutor) ID() (string, error) {
// runAs takes a user id as a string and looks up the user, and sets the command
// to execute as that user.
func (e *LinuxExecutor) runAs(userid string) error {
u, err := userlookup.Lookup(userid)
u, err := user.Lookup(userid)
if err != nil {
return fmt.Errorf("Failed to identify user %v: %v", userid, err)
}
@@ -346,16 +345,17 @@ func (e *LinuxExecutor) cleanTaskDir() error {
// cgroup configuration. It returns an error if the resources are invalid.
func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error {
e.groups = &cgroupConfig.Cgroup{}
e.groups.Resources = &cgroupConfig.Resources{}
e.groups.Name = structs.GenerateUUID()
// TODO: verify this is needed for things like network access
e.groups.AllowAllDevices = true
e.groups.Resources.AllowAllDevices = true
if resources.MemoryMB > 0 {
// Total amount of memory allowed to consume
e.groups.Memory = int64(resources.MemoryMB * 1024 * 1024)
e.groups.Resources.Memory = int64(resources.MemoryMB * 1024 * 1024)
// Disable swap to avoid issues on the machine
e.groups.MemorySwap = int64(-1)
e.groups.Resources.MemorySwap = int64(-1)
}
if resources.CPU < 2 {
@@ -363,7 +363,7 @@ func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error {
}
// Set the relative CPU shares for this cgroup.
e.groups.CpuShares = int64(resources.CPU)
e.groups.Resources.CpuShares = int64(resources.CPU)
if resources.IOPS != 0 {
// Validate it is in an acceptable range.
@@ -371,7 +371,7 @@ func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error {
return fmt.Errorf("resources.IOPS must be between 10 and 1000: %d", resources.IOPS)
}
e.groups.BlkioWeight = uint16(resources.IOPS)
e.groups.Resources.BlkioWeight = uint16(resources.IOPS)
}
return nil

View File

@@ -251,7 +251,9 @@ func (s *Spawner) pollWait() *structs.WaitResult {
}
// Read after the process exits.
for _ = range time.Tick(5 * time.Second) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !s.Alive() {
break
}

View File

@@ -12,16 +12,21 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
consulAvailable = "available"
consulUnavailable = "unavailable"
)
// ConsulFingerprint is used to fingerprint the architecture
type ConsulFingerprint struct {
logger *log.Logger
client *consul.Client
logger *log.Logger
client *consul.Client
lastState string
}
// NewConsulFingerprint is used to create an OS fingerprint
func NewConsulFingerprint(logger *log.Logger) Fingerprint {
f := &ConsulFingerprint{logger: logger}
return f
return &ConsulFingerprint{logger: logger, lastState: consulUnavailable}
}
func (f *ConsulFingerprint) Fingerprint(config *client.Config, node *structs.Node) (bool, error) {
@@ -55,6 +60,13 @@ func (f *ConsulFingerprint) Fingerprint(config *client.Config, node *structs.Nod
if err != nil {
// Clear any attributes set by a previous fingerprint.
f.clearConsulAttributes(node)
// Print a message indicating that the Consul Agent is not available
// anymore
if f.lastState == consulAvailable {
f.logger.Printf("[INFO] fingerprint.consul: consul agent is unavailable")
}
f.lastState = consulUnavailable
return false, nil
}
@@ -68,6 +80,12 @@ func (f *ConsulFingerprint) Fingerprint(config *client.Config, node *structs.Nod
node.Attributes["consul.datacenter"],
node.Attributes["consul.name"])
// If the Consul Agent was previously unavailable print a message to
// indicate the Agent is available now
if f.lastState == consulUnavailable {
f.logger.Printf("[INFO] fingerprint.consul: consul agent is available")
}
f.lastState = consulAvailable
return true, nil
}

View File

@@ -80,7 +80,7 @@ func NewEnvAWSFingerprint(logger *log.Logger) Fingerprint {
}
func (f *EnvAWSFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) {
if !isAWS() {
if !f.isAWS() {
return false, nil
}
@@ -161,7 +161,7 @@ func (f *EnvAWSFingerprint) Fingerprint(cfg *config.Config, node *structs.Node)
return true, nil
}
func isAWS() bool {
func (f *EnvAWSFingerprint) isAWS() bool {
// Read the internal metadata URL from the environment, allowing test files to
// provide their own
metadataURL := os.Getenv("AWS_ENV_URL")
@@ -178,7 +178,7 @@ func isAWS() bool {
// Query the metadata url for the ami-id, to veryify we're on AWS
resp, err := client.Get(metadataURL + "ami-id")
if err != nil {
log.Printf("[DEBUG] fingerprint.env_aws: Error querying AWS Metadata URL, skipping")
f.logger.Printf("[DEBUG] fingerprint.env_aws: Error querying AWS Metadata URL, skipping")
return false
}
defer resp.Body.Close()
@@ -190,7 +190,7 @@ func isAWS() bool {
instanceID, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] fingerprint.env_aws: Error reading AWS Instance ID, skipping")
f.logger.Printf("[DEBUG] fingerprint.env_aws: Error reading AWS Instance ID, skipping")
return false
}

View File

@@ -1,89 +1,71 @@
package client
import (
"math/rand"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// The errorCounter keeps track of the number of times a process has exited
// It returns the duration after which a task is restarted
// For Batch jobs, the interval is set to zero value since the takss
// will be restarted only upto maxAttempts times
type restartTracker interface {
nextRestart(exitCode int) (bool, time.Duration)
}
// jitter is the percent of jitter added to restart delays.
const jitter = 0.25
func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) restartTracker {
switch jobType {
case structs.JobTypeService:
return &serviceRestartTracker{
maxAttempts: restartPolicy.Attempts,
startTime: time.Now(),
interval: restartPolicy.Interval,
delay: restartPolicy.Delay,
}
default:
return &batchRestartTracker{
maxAttempts: restartPolicy.Attempts,
delay: restartPolicy.Delay,
}
func newRestartTracker(policy *structs.RestartPolicy) *RestartTracker {
return &RestartTracker{
startTime: time.Now(),
policy: policy,
rand: rand.New(rand.NewSource(time.Now().Unix())),
}
}
// noRestartsTracker returns a RestartTracker that never restarts.
func noRestartsTracker() restartTracker {
return &batchRestartTracker{maxAttempts: 0}
type RestartTracker struct {
count int // Current number of attempts.
startTime time.Time // When the interval began
policy *structs.RestartPolicy
rand *rand.Rand
}
type batchRestartTracker struct {
maxAttempts int
delay time.Duration
count int
}
func (b *batchRestartTracker) increment() {
b.count += 1
}
func (b *batchRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
if b.count < b.maxAttempts && exitCode > 0 {
b.increment()
return true, b.delay
}
return false, 0
}
type serviceRestartTracker struct {
maxAttempts int
delay time.Duration
interval time.Duration
count int
startTime time.Time
}
func (s *serviceRestartTracker) increment() {
s.count += 1
}
func (s *serviceRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
defer s.increment()
windowEndTime := s.startTime.Add(s.interval)
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
// Check if we have entered a new interval.
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
// If the window of restart is over we wait until the delay duration
if now.After(windowEndTime) {
s.count = 0
s.startTime = time.Now()
return true, s.delay
if now.After(end) {
r.count = 0
r.startTime = now
return true, r.jitter()
}
// If we are within the delay duration and didn't exhaust all retries
if s.count < s.maxAttempts {
return true, s.delay
r.count++
// If we are under the attempts, restart with delay.
if r.count <= r.policy.Attempts {
return r.shouldRestart(exitCode), r.jitter()
}
// If we exhausted all the retries and are withing the time window
return true, windowEndTime.Sub(now)
// Don't restart since mode is "fail"
if r.policy.Mode == structs.RestartPolicyModeFail {
return false, 0
}
// Apply an artifical wait to enter the next interval
return r.shouldRestart(exitCode), end.Sub(now)
}
// shouldRestart returns whether a restart should occur based on the exit code
// and the RestartOnSuccess configuration.
func (r *RestartTracker) shouldRestart(exitCode int) bool {
return exitCode != 0 || r.policy.RestartOnSuccess
}
// jitter returns the delay time plus a jitter.
func (r *RestartTracker) jitter() time.Duration {
d := r.policy.Delay.Nanoseconds()
j := float64(r.rand.Int63n(d)) * jitter
return time.Duration(d + int64(j))
}
// Returns a tracker that never restarts.
func noRestartsTracker() *RestartTracker {
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
return newRestartTracker(policy)
}

View File

@@ -1,78 +1,81 @@
package client
import (
"github.com/hashicorp/nomad/nomad/structs"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestTaskRunner_ServiceRestartCounter(t *testing.T) {
interval := 2 * time.Minute
delay := 1 * time.Second
attempts := 3
rt := newRestartTracker(structs.JobTypeService, &structs.RestartPolicy{Attempts: attempts, Interval: interval, Delay: delay})
func testPolicy(success bool, mode string) *structs.RestartPolicy {
return &structs.RestartPolicy{
Interval: 2 * time.Minute,
Delay: 1 * time.Second,
Attempts: 3,
Mode: mode,
RestartOnSuccess: success,
}
}
for i := 0; i < attempts; i++ {
actual, when := rt.nextRestart(127)
// withinJitter is a helper that returns whether the returned delay is within
// the jitter.
func withinJitter(expected, actual time.Duration) bool {
return float64((actual.Nanoseconds()-expected.Nanoseconds())/
expected.Nanoseconds()) <= jitter
}
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("should restart returned %v, actual %v", actual, true)
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
}
if when != delay {
t.Fatalf("nextRestart() returned %v; want %v", when, delay)
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}
time.Sleep(1 * time.Second)
// Follow up restarts should cause delay.
for i := 0; i < 3; i++ {
actual, when := rt.nextRestart(127)
actual, when := rt.NextRestart(127)
if !actual {
t.Fail()
}
if !(when > delay && when < interval) {
t.Fatalf("nextRestart() returned %v; want less than %v and more than %v", when, interval, delay)
if !(when > p.Delay && when < p.Interval) {
t.Fatalf("NextRestart() returned %v; want less than %v and more than %v", when, p.Interval, p.Delay)
}
}
}
func TestTaskRunner_BatchRestartCounter(t *testing.T) {
attempts := 2
interval := 1 * time.Second
delay := 1 * time.Second
rt := newRestartTracker(structs.JobTypeBatch,
&structs.RestartPolicy{Attempts: attempts,
Interval: interval,
Delay: delay,
},
)
for i := 0; i < attempts; i++ {
shouldRestart, when := rt.nextRestart(127)
if !shouldRestart {
t.Fatalf("should restart returned %v, actual %v", shouldRestart, true)
}
if when != delay {
t.Fatalf("Delay should be %v, actual: %v", delay, when)
}
}
actual, _ := rt.nextRestart(1)
if actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)
}
}
func TestTaskRunner_BatchRestartOnSuccess(t *testing.T) {
attempts := 2
interval := 1 * time.Second
delay := 1 * time.Second
rt := newRestartTracker(structs.JobTypeBatch,
&structs.RestartPolicy{Attempts: attempts,
Interval: interval,
Delay: delay,
},
)
shouldRestart, _ := rt.nextRestart(0)
if shouldRestart {
t.Fatalf("should restart returned %v, expected: %v", shouldRestart, false)
func TestClient_RestartTracker_ModeFail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := newRestartTracker(p)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}
// Next restart should cause fail
if actual, _ := rt.NextRestart(127); actual {
t.Fail()
}
}
func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
t.Parallel()
p := testPolicy(false, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p)
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
}
}

View File

@@ -23,8 +23,8 @@ type TaskRunner struct {
updater TaskStateUpdater
logger *log.Logger
ctx *driver.ExecContext
allocID string
restartTracker restartTracker
alloc *structs.Allocation
restartTracker *RestartTracker
consulService *ConsulService
task *structs.Task
@@ -52,8 +52,8 @@ type TaskStateUpdater func(taskName string)
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
allocID string, task *structs.Task, state *structs.TaskState,
restartTracker restartTracker, consulService *ConsulService) *TaskRunner {
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {
tc := &TaskRunner{
config: config,
@@ -62,7 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
restartTracker: restartTracker,
consulService: consulService,
ctx: ctx,
allocID: allocID,
alloc: alloc,
task: task,
state: state,
updateCh: make(chan *structs.Task, 8),
@@ -85,7 +85,7 @@ func (r *TaskRunner) stateFilePath() string {
dirName := fmt.Sprintf("task-%s", hashHex)
// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.allocID,
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID,
dirName, "state.json")
return path
}
@@ -113,7 +113,7 @@ func (r *TaskRunner) RestoreState() error {
// In the case it fails, we relaunch the task in the Run() method.
if err != nil {
r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.task.Name, r.alloc.ID, err)
return nil
}
r.handle = handle
@@ -176,7 +176,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.allocID, err)
r.task.Driver, r.alloc.ID, err)
r.logger.Printf("[ERR] client: %s", err)
}
return driver, err
@@ -196,7 +196,7 @@ func (r *TaskRunner) startTask() error {
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.task.Name, r.alloc.ID, err)
e := structs.NewTaskEvent(structs.TaskDriverFailure).
SetDriverError(fmt.Errorf("failed to start: %v", err))
r.setState(structs.TaskStateDead, e)
@@ -211,7 +211,7 @@ func (r *TaskRunner) startTask() error {
func (r *TaskRunner) Run() {
defer close(r.waitCh)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.allocID)
r.task.Name, r.alloc.ID)
r.run()
return
@@ -234,10 +234,7 @@ func (r *TaskRunner) run() {
destroyed := false
// Register the services defined by the task with Consil
r.consulService.Register(r.task, r.allocID)
// De-Register the services belonging to the task from consul
defer r.consulService.Deregister(r.task, r.allocID)
r.consulService.Register(r.task, r.alloc)
OUTER:
// Wait for updates
@@ -249,7 +246,7 @@ func (r *TaskRunner) run() {
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
}
case <-r.destroyCh:
// Avoid destroying twice
@@ -259,13 +256,16 @@ func (r *TaskRunner) run() {
// Send the kill signal, and use the WaitCh to block until complete
if err := r.handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
destroyErr = err
}
destroyed = true
}
}
// De-Register the services belonging to the task from consul
r.consulService.Deregister(r.task, r.alloc)
// If the user destroyed the task, we do not attempt to do any restarts.
if destroyed {
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
@@ -274,16 +274,16 @@ func (r *TaskRunner) run() {
// Log whether the task was successful or not.
if !waitRes.Successful() {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes)
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes)
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID)
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID)
}
// Check if we should restart. If not mark task as dead and exit.
shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode)
shouldRestart, when := r.restartTracker.NextRestart(waitRes.ExitCode)
waitEvent := r.waitErrorToEvent(waitRes)
if !shouldRestart {
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID)
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
r.setState(structs.TaskStateDead, waitEvent)
return
}
@@ -329,7 +329,7 @@ func (r *TaskRunner) Update(update *structs.Task) {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.allocID)
update.Name, r.alloc.ID)
}
}

View File

@@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
upd := &MockTaskStateUpdater{}
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false)
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
// Initialize the port listing. This should be done by the offer process but
// we have a mock so that doesn't happen.
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
@@ -42,13 +42,13 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
ctx := driver.NewExecContext(allocDir, alloc.ID)
rp := structs.NewRestartPolicy(structs.JobTypeService)
restartTracker := newRestartTracker(structs.JobTypeService, rp)
restartTracker := newRestartTracker(rp)
if !restarts {
restartTracker = noRestartsTracker()
}
state := alloc.TaskStates[task.Name]
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker, consulClient)
tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, state, restartTracker, consulClient)
return upd, tr
}
@@ -164,9 +164,9 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
}
// Create a new task runner
consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false)
consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker,
tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker,
consulClient)
if err := tr2.RestoreState(); err != nil {
t.Fatalf("err: %v", err)

View File

@@ -107,15 +107,6 @@ func (c *Command) readConfig() *Config {
return nil
}
if cmdConfig.Server.RetryInterval != "" {
dur, err := time.ParseDuration(cmdConfig.Server.RetryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing retry interval: %s", err))
return nil
}
cmdConfig.Server.retryInterval = dur
}
// Split the servers.
if servers != "" {
cmdConfig.Client.Servers = strings.Split(servers, ",")
@@ -188,6 +179,14 @@ func (c *Command) readConfig() *Config {
return config
}
// Parse the RetryInterval.
dur, err := time.ParseDuration(config.Server.RetryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing retry interval: %s", err))
return nil
}
config.Server.retryInterval = dur
// Check that the server is running in at least one mode.
if !(config.Server.Enabled || config.Client.Enabled) {
c.Ui.Error("Must specify either server, client or dev mode for the agent.")
@@ -356,7 +355,7 @@ func (c *Command) Run(args []string) int {
}
// Log config files
if len(config.Files) > 1 {
if len(config.Files) > 0 {
c.Ui.Info(fmt.Sprintf("Loaded configuration from %s", strings.Join(config.Files, ", ")))
} else {
c.Ui.Info("No configuration files loaded")

View File

@@ -3,7 +3,6 @@ package agent
import (
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"testing"
@@ -112,7 +111,7 @@ func TestRetryJoin(t *testing.T) {
go func() {
if code := cmd.Run(args); code != 0 {
log.Printf("bad: %d", code)
t.Logf("bad: %d", code)
}
close(doneCh)
}()

View File

@@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"time"
@@ -613,7 +614,7 @@ func LoadConfig(path string) (*Config, error) {
if fi.IsDir() {
return LoadConfigDir(path)
}
return LoadConfigFile(path)
return LoadConfigFile(filepath.Clean(path))
}
// LoadConfigString is used to parse a config string
@@ -705,6 +706,8 @@ func LoadConfigDir(dir string) (*Config, error) {
return &Config{}, nil
}
sort.Strings(files)
var result *Config
for _, f := range files {
config, err := LoadConfigFile(f)

View File

@@ -327,7 +327,7 @@ func TestConfig_LoadConfigsFileOrder(t *testing.T) {
config := config1.Merge(config2)
if !reflect.DeepEqual(config.Files, expected) {
t.Errorf("Loaded configs don't match\nExpected\n%+vGot\n%+v\n",
t.Errorf("Loaded configs don't match\nwant: %+v\n got: %+v\n",
expected, config.Files)
}
}

View File

@@ -104,15 +104,24 @@ job "example" {
# Defaults to 1
# count = 1
# Restart Policy - This block defines the restart policy for TaskGroups,
# the attempts value defines the number of restarts Nomad will do if Tasks
# in this TaskGroup fails in a rolling window of interval duration
# The delay value makes Nomad wait for that duration to restart after a Task
# fails or crashes.
# Configure the restart policy for the task group. If not provided, a
# default is used based on the job type.
restart {
interval = "5m"
# The number of attempts to run the job within the specified interval.
attempts = 10
interval = "5m"
# A delay between a task failing and a restart occuring.
delay = "25s"
# Whether the tasks should be restarted if the exit successfully.
on_success = true
# Mode controls what happens when a task has restarted "attempts"
# times within the interval. "delay" mode delays the next restart
# till the next interval. "fail" mode does not restart the task if
# "attempts" has been hit within the interval.
mode = "delay"
}
# Define a task to run

View File

@@ -80,6 +80,9 @@ func (c *RunCommand) Run(args []string) int {
return 1
}
// Initialize any fields that need to be.
job.InitFields()
// Check that the job is valid
if err := job.Validate(); err != nil {
c.Ui.Error(fmt.Sprintf("Error validating job: %s", err))

View File

@@ -77,6 +77,7 @@ func TestRunCommand_Fails(t *testing.T) {
defer os.Remove(fh3.Name())
_, err = fh3.WriteString(`
job "job1" {
type = "service"
datacenters = [ "dc1" ]
group "group1" {
count = 1

View File

@@ -48,6 +48,9 @@ func (c *ValidateCommand) Run(args []string) int {
return 1
}
// Initialize any fields that need to be.
job.InitFields()
// Check that the job is valid
if err := job.Validate(); err != nil {
c.Ui.Error(fmt.Sprintf("Error validating job: %s", err))

View File

@@ -24,6 +24,7 @@ func TestValidateCommand(t *testing.T) {
defer os.Remove(fh.Name())
_, err = fh.WriteString(`
job "job1" {
type = "service"
datacenters = [ "dc1" ]
group "group1" {
count = 1

View File

@@ -9,7 +9,7 @@ sudo apt-get install -y unzip curl wget vim
# Download Nomad
echo Fetching Nomad...
cd /tmp/
curl -sSL https://releases.hashicorp.com/nomad/0.2.1/nomad_0.2.1_linux_amd64.zip -o nomad.zip
curl -sSL https://releases.hashicorp.com/nomad/0.2.3/nomad_0.2.3_linux_amd64.zip -o nomad.zip
echo Installing Nomad...
unzip nomad.zip

View File

@@ -1,35 +0,0 @@
// +build !windows
package userlookup
import (
"fmt"
"io/ioutil"
"os/user"
"strings"
)
// Lookup checks if the given username or uid is present in /etc/passwd
// and returns the user struct.
// If the username is not found, an error is returned.
// Credit to @creak, https://github.com/docker/docker/pull/1096
func Lookup(uid string) (*user.User, error) {
file, err := ioutil.ReadFile("/etc/passwd")
if err != nil {
return nil, err
}
for _, line := range strings.Split(string(file), "\n") {
data := strings.Split(line, ":")
if len(data) > 5 && (data[0] == uid || data[2] == uid) {
return &user.User{
Uid: data[2],
Gid: data[3],
Username: data[0],
Name: data[4],
HomeDir: data[5],
}, nil
}
}
return nil, fmt.Errorf("User not found in /etc/passwd")
}

View File

@@ -159,10 +159,9 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2)
for i, t := range tasks {
result.TaskGroups[i] = &structs.TaskGroup{
Name: t.Name,
Count: 1,
Tasks: []*structs.Task{t},
RestartPolicy: structs.NewRestartPolicy(result.Type),
Name: t.Name,
Count: 1,
Tasks: []*structs.Task{t},
}
}
}
@@ -230,11 +229,10 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
return err
}
}
g.RestartPolicy = structs.NewRestartPolicy(result.Type)
// Parse restart policy
if o := listVal.Filter("restart"); len(o.Items) > 0 {
if err := parseRestartPolicy(g.RestartPolicy, o); err != nil {
if err := parseRestartPolicy(&g.RestartPolicy, o); err != nil {
return err
}
}
@@ -267,12 +265,9 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
return nil
}
func parseRestartPolicy(final *structs.RestartPolicy, list *ast.ObjectList) error {
func parseRestartPolicy(final **structs.RestartPolicy, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) == 0 {
return nil
}
if len(list.Items) != 1 {
if len(list.Items) > 1 {
return fmt.Errorf("only one 'restart' block allowed")
}
@@ -297,7 +292,7 @@ func parseRestartPolicy(final *structs.RestartPolicy, list *ast.ObjectList) erro
return err
}
*final = result
*final = &result
return nil
}
@@ -623,13 +618,16 @@ func parsePorts(networkObj *ast.ObjectList, nw *structs.NetworkResource) error {
portsObjList := networkObj.Filter("port")
knownPortLabels := make(map[string]bool)
for _, port := range portsObjList.Items {
if len(port.Keys) == 0 {
return fmt.Errorf("ports must be named")
}
label := port.Keys[0].Token.Value().(string)
if !reDynamicPorts.MatchString(label) {
return errPortLabel
}
l := strings.ToLower(label)
if knownPortLabels[l] {
return fmt.Errorf("Found a port label collision: %s", label)
return fmt.Errorf("found a port label collision: %s", label)
}
var p map[string]interface{}
var res structs.Port

View File

@@ -48,11 +48,6 @@ func TestParse(t *testing.T) {
&structs.TaskGroup{
Name: "outside",
Count: 1,
RestartPolicy: &structs.RestartPolicy{
Attempts: 2,
Interval: 1 * time.Minute,
Delay: 15 * time.Second,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "outside",
@@ -83,9 +78,11 @@ func TestParse(t *testing.T) {
"elb_checks": "3",
},
RestartPolicy: &structs.RestartPolicy{
Interval: 10 * time.Minute,
Attempts: 5,
Delay: 15 * time.Second,
Interval: 10 * time.Minute,
Attempts: 5,
Delay: 15 * time.Second,
RestartOnSuccess: true,
Mode: "delay",
},
Tasks: []*structs.Task{
&structs.Task{
@@ -96,13 +93,11 @@ func TestParse(t *testing.T) {
},
Services: []*structs.Service{
{
Id: "",
Name: "binstore-storagelocker-binsl-binstore",
Tags: []string{"foo", "bar"},
PortLabel: "http",
Checks: []*structs.ServiceCheck{
{
Id: "",
Name: "check-name",
Type: "tcp",
Interval: 10 * time.Second,
@@ -273,11 +268,6 @@ func TestParse(t *testing.T) {
&structs.TaskGroup{
Name: "bar",
Count: 1,
RestartPolicy: &structs.RestartPolicy{
Attempts: 2,
Interval: 1 * time.Minute,
Delay: 15 * time.Second,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "bar",
@@ -344,7 +334,7 @@ func TestOverlappingPorts(t *testing.T) {
t.Fatalf("Expected an error")
}
if !strings.Contains(err.Error(), "Found a port label collision") {
if !strings.Contains(err.Error(), "found a port label collision") {
t.Fatalf("Expected collision error; got %v", err)
}
}

View File

@@ -35,6 +35,8 @@ job "binstore-storagelocker" {
attempts = 5
interval = "10m"
delay = "15s"
on_success = true
mode = "delay"
}
task "binstore" {
driver = "docker"

View File

@@ -131,6 +131,14 @@ type Config struct {
// for GC. This gives users some time to debug a failed evaluation.
EvalGCThreshold time.Duration
// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration
// JobGCThreshold is how old a job must be before it eligible for GC. This gives
// the user time to inspect the job.
JobGCThreshold time.Duration
// NodeGCInterval is how often we dispatch a job to GC failed nodes.
NodeGCInterval time.Duration
@@ -202,6 +210,8 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
NodeGCThreshold: 24 * time.Hour,
EvalNackTimeout: 60 * time.Second,

View File

@@ -33,11 +33,90 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
return s.evalGC(eval)
case structs.CoreJobNodeGC:
return s.nodeGC(eval)
case structs.CoreJobJobGC:
return s.jobGC(eval)
default:
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
}
}
// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
// Get all the jobs eligible for garbage collection.
iter, err := c.snap.JobsByGC(true)
if err != nil {
return err
}
// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)
// Collect the allocations, evaluations and jobs to GC
var gcAlloc, gcEval, gcJob []string
OUTER:
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
// Ignore new jobs.
if job.CreateIndex > oldThreshold {
continue
}
evals, err := c.snap.EvalsByJob(job.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
continue
}
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil || !gc {
continue OUTER
}
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
// Job is eligible for garbage collection
gcJob = append(gcJob, job.ID)
}
// Fast-path the nothing case
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible",
len(gcJob), len(gcEval), len(gcAlloc))
// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
}
// Call to the leader to deregister the jobs.
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.JobDeregisterResponse
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
return err
}
}
return nil
}
// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
@@ -57,39 +136,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
OUTER:
for {
raw := iter.Next()
if raw == nil {
break
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold {
continue
}
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
continue
return err
}
// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold {
continue OUTER
}
}
// Evaluation is eligible for garbage collection
gcEval = append(gcEval, eval.ID)
for _, alloc := range allocs {
gcAlloc = append(gcAlloc, alloc.ID)
if gc {
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
}
@@ -100,10 +156,52 @@ OUTER:
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
len(gcEval), len(gcAlloc))
return c.evalReap(gcEval, gcAlloc)
}
// gcEval returns whether the eval should be garbage collected given a raft
// threshold index. The eval disqualifies for garbage collection if it or its
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
return false, nil, nil
}
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
return false, nil, err
}
// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
return false, nil, nil
}
}
allocIds := make([]string, len(allocs))
for i, alloc := range allocs {
allocIds[i] = alloc.ID
}
// Evaluation is eligible for garbage collection
return true, allocIds, nil
}
// evalReap contacts the leader and issues a reap on the passed evals and
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
req := structs.EvalDeleteRequest{
Evals: gcEval,
Allocs: gcAlloc,
Evals: evals,
Allocs: allocs,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
@@ -113,6 +211,7 @@ OUTER:
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
}
return nil
}

View File

@@ -111,3 +111,107 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}
for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}
outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}

View File

@@ -1,6 +1,7 @@
package nomad
import (
"errors"
"fmt"
"time"
@@ -25,12 +26,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if args.Job == nil {
return fmt.Errorf("missing job for registration")
}
if err := args.Job.Validate(); err != nil {
if err := j.checkBlacklist(args.Job); err != nil {
return err
}
// Initialize all the fields of services
args.Job.InitAllServiceFields()
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.InitFields()
if err := args.Job.Validate(); err != nil {
return err
}
if args.Job.Type == structs.JobTypeCore {
return fmt.Errorf("job type cannot be core")
@@ -75,6 +81,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}
// checkBlacklist returns an error if the user has set any blacklisted field in
// the job.
func (j *Job) checkBlacklist(job *structs.Job) error {
if job.GC {
return errors.New("GC field of a job is used only internally and should not be set by user")
}
return nil
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {

View File

@@ -171,6 +171,68 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
}
}
func TestJobEndpoint_Register_Batch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if !out.GC {
t.Fatal("expect batch job to be made garbage collectible")
}
}
func TestJobEndpoint_Register_GC_Set(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.GC = true
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err == nil {
t.Fatalf("expect err")
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@@ -359,15 +421,12 @@ func TestJobEndpoint_GetJob(t *testing.T) {
for tgix, tg := range j.TaskGroups {
for tidx, t := range tg.Tasks {
for sidx, service := range t.Services {
service.Id = resp2.Job.TaskGroups[tgix].Tasks[tidx].Services[sidx].Id
for cidx, check := range service.Checks {
check.Name = resp2.Job.TaskGroups[tgix].Tasks[tidx].Services[sidx].Checks[cidx].Name
check.Id = resp2.Job.TaskGroups[tgix].Tasks[tidx].Services[sidx].Checks[cidx].Id
}
}
}
}
j.TaskGroups[0].Tasks[0].Services[0].Id = resp2.Job.TaskGroups[0].Tasks[0].Services[0].Id
if !reflect.DeepEqual(j, resp2.Job) {
t.Fatalf("bad: %#v %#v", job, resp2.Job)

View File

@@ -173,6 +173,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
defer evalGC.Stop()
nodeGC := time.NewTicker(s.config.NodeGCInterval)
defer nodeGC.Stop()
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()
for {
select {
@@ -180,6 +182,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
case <-stopCh:
return
}

View File

@@ -76,9 +76,11 @@ func Job() *structs.Job {
Name: "web",
Count: 10,
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
RestartOnSuccess: true,
Mode: structs.RestartPolicyModeDelay,
},
Tasks: []*structs.Task{
&structs.Task{
@@ -95,6 +97,10 @@ func Job() *structs.Job {
Name: "${TASK}-frontend",
PortLabel: "http",
},
{
Name: "${TASK}-admin",
PortLabel: "admin",
},
},
Resources: &structs.Resources{
CPU: 500,
@@ -102,7 +108,7 @@ func Job() *structs.Job {
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
DynamicPorts: []structs.Port{{Label: "http"}, {Label: "admin"}},
},
},
},
@@ -122,6 +128,7 @@ func Job() *structs.Job {
CreateIndex: 42,
ModifyIndex: 99,
}
job.InitFields()
return job
}
@@ -146,9 +153,11 @@ func SystemJob() *structs.Job {
Name: "web",
Count: 1,
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
RestartOnSuccess: true,
Mode: structs.RestartPolicyModeDelay,
},
Tasks: []*structs.Task{
&structs.Task{
@@ -226,11 +235,6 @@ func Alloc() *structs.Allocation {
},
},
},
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
Job: Job(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
// stateStoreSchema is used to return the schema for the state store
@@ -100,10 +101,29 @@ func jobTableSchema() *memdb.TableSchema {
Lowercase: false,
},
},
"gc": &memdb.IndexSchema{
Name: "gc",
AllowMissing: false,
Unique: false,
Indexer: &memdb.ConditionalIndex{
Conditional: jobIsGCable,
},
},
},
}
}
// jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index
// on whether a job is eligible for garbage collection.
func jobIsGCable(obj interface{}) (bool, error) {
j, ok := obj.(*structs.Job)
if !ok {
return false, fmt.Errorf("Unexpected type: %v", obj)
}
return j.GC, nil
}
// evalTableSchema returns the MemDB schema for the eval table.
// This table is used to store all the evaluations that are pending
// or recently completed.

View File

@@ -398,6 +398,18 @@ func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator
return iter, nil
}
// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
// collection.
func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("jobs", "gc", gc)
if err != nil {
return nil, err
}
return iter, nil
}
// UpsertEvaluation is used to upsert an evaluation
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
@@ -595,6 +607,7 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
// Pull in anything the client is the authority on
copyAlloc.ClientStatus = alloc.ClientStatus
copyAlloc.ClientDescription = alloc.ClientDescription
copyAlloc.TaskStates = alloc.TaskStates
// Update the modify index
copyAlloc.ModifyIndex = index

View File

@@ -471,6 +471,63 @@ func TestStateStore_JobsByScheduler(t *testing.T) {
}
}
func TestStateStore_JobsByGC(t *testing.T) {
state := testStateStore(t)
var gc, nonGc []*structs.Job
for i := 0; i < 10; i++ {
job := mock.Job()
nonGc = append(nonGc, job)
if err := state.UpsertJob(1000+uint64(i), job); err != nil {
t.Fatalf("err: %v", err)
}
}
for i := 0; i < 10; i++ {
job := mock.Job()
job.GC = true
gc = append(gc, job)
if err := state.UpsertJob(2000+uint64(i), job); err != nil {
t.Fatalf("err: %v", err)
}
}
iter, err := state.JobsByGC(true)
if err != nil {
t.Fatalf("err: %v", err)
}
var outGc []*structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
outGc = append(outGc, i.(*structs.Job))
}
iter, err = state.JobsByGC(false)
if err != nil {
t.Fatalf("err: %v", err)
}
var outNonGc []*structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
outNonGc = append(outNonGc, i.(*structs.Job))
}
sort.Sort(JobIDSort(gc))
sort.Sort(JobIDSort(nonGc))
sort.Sort(JobIDSort(outGc))
sort.Sort(JobIDSort(outNonGc))
if !reflect.DeepEqual(gc, outGc) {
t.Fatalf("bad: %#v %#v", gc, outGc)
}
if !reflect.DeepEqual(nonGc, outNonGc) {
t.Fatalf("bad: %#v %#v", nonGc, outNonGc)
}
}
func TestStateStore_RestoreJob(t *testing.T) {
state := testStateStore(t)
job := mock.Job()

View File

@@ -22,19 +22,27 @@ func TestRemoveAllocs(t *testing.T) {
}
}
func TestFilterTerminalALlocs(t *testing.T) {
func TestFilterTerminalAllocs(t *testing.T) {
l := []*Allocation{
&Allocation{ID: "foo", DesiredStatus: AllocDesiredStatusRun},
&Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict},
&Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop},
&Allocation{ID: "zip", DesiredStatus: AllocDesiredStatusRun},
&Allocation{
ID: "foo",
DesiredStatus: AllocDesiredStatusRun,
ClientStatus: AllocClientStatusPending,
},
&Allocation{
ID: "bam",
DesiredStatus: AllocDesiredStatusRun,
ClientStatus: AllocClientStatusDead,
},
}
out := FilterTerminalAllocs(l)
if len(out) != 2 {
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
}
if out[0].ID != "foo" && out[1].ID != "zip" {
if out[0].ID != "foo" {
t.Fatalf("bad: %#v", out)
}
}

View File

@@ -19,17 +19,8 @@ import (
)
var (
ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoRegionPath = fmt.Errorf("No path to region")
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 2,
Interval: 1 * time.Minute,
}
defaultBatchJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 15,
}
ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoRegionPath = fmt.Errorf("No path to region")
)
type MessageType uint8
@@ -764,6 +755,10 @@ type Job struct {
// Periodic is used to define the interval the job is run at.
Periodic *PeriodicConfig
// GC is used to mark the job as available for garbage collection after it
// has no outstanding evaluations or allocations.
GC bool
// Meta is used to associate arbitrary metadata with this
// job. This is opaque to Nomad.
Meta map[string]string
@@ -779,13 +774,16 @@ type Job struct {
ModifyIndex uint64
}
// InitAllServiceFields traverses all Task Groups and makes them
// interpolate Job, Task group and Task names in all Service names.
// It also generates the check names if they are not set. This method also
// generates Check and Service IDs
func (j *Job) InitAllServiceFields() {
// InitFields is used to initialize fields in the Job. This should be called
// when registering a Job.
func (j *Job) InitFields() {
for _, tg := range j.TaskGroups {
tg.InitAllServiceFields(j.Name)
tg.InitFields(j)
}
// If the job is batch then make it GC.
if j.Type == JobTypeBatch {
j.GC = true
}
}
@@ -968,15 +966,61 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
return time.Time{}
}
// RestartPolicy influences how Nomad restarts Tasks when they
// crash or fail.
var (
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 2,
Interval: 1 * time.Minute,
RestartOnSuccess: true,
Mode: RestartPolicyModeDelay,
}
defaultBatchJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 15,
Interval: 7 * 24 * time.Hour,
RestartOnSuccess: false,
Mode: RestartPolicyModeDelay,
}
)
const (
// RestartPolicyModeDelay causes an artificial delay till the next interval is
// reached when the specified attempts have been reached in the interval.
RestartPolicyModeDelay = "delay"
// RestartPolicyModeFail causes a job to fail if the specified number of
// attempts are reached within an interval.
RestartPolicyModeFail = "fail"
)
// RestartPolicy configures how Tasks are restarted when they crash or fail.
type RestartPolicy struct {
// Attempts is the number of restart that will occur in an interval.
Attempts int
// Interval is a duration in which we can limit the number of restarts
// within.
Interval time.Duration
Delay time.Duration
// Delay is the time between a failure and a restart.
Delay time.Duration
// RestartOnSuccess determines whether a task should be restarted if it
// exited successfully.
RestartOnSuccess bool `mapstructure:"on_success"`
// Mode controls what happens when the task restarts more than attempt times
// in an interval.
Mode string
}
func (r *RestartPolicy) Validate() error {
switch r.Mode {
case RestartPolicyModeDelay, RestartPolicyModeFail:
default:
return fmt.Errorf("Unsupported restart mode: %q", r.Mode)
}
if r.Interval == 0 {
return nil
}
@@ -1024,12 +1068,15 @@ type TaskGroup struct {
Meta map[string]string
}
// InitAllServiceFields traverses over all Tasks and makes them to interpolate
// values of Job, Task Group and Task names in all Service Names.
// It also generates service ids, check ids and check names
func (tg *TaskGroup) InitAllServiceFields(job string) {
// InitFields is used to initialize fields in the TaskGroup.
func (tg *TaskGroup) InitFields(job *Job) {
// Set the default restart policy.
if tg.RestartPolicy == nil {
tg.RestartPolicy = NewRestartPolicy(job.Type)
}
for _, task := range tg.Tasks {
task.InitAllServiceFields(job, tg.Name)
task.InitFields(job, tg)
}
}
@@ -1106,7 +1153,6 @@ const (
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Id string // Id of the check, must be unique and it is autogenrated
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Script string // Script to invoke for script check
@@ -1131,9 +1177,9 @@ func (sc *ServiceCheck) Validate() error {
return nil
}
func (sc *ServiceCheck) Hash(serviceId string) string {
func (sc *ServiceCheck) Hash(serviceID string) string {
h := sha1.New()
io.WriteString(h, serviceId)
io.WriteString(h, serviceID)
io.WriteString(h, sc.Name)
io.WriteString(h, sc.Type)
io.WriteString(h, sc.Script)
@@ -1145,9 +1191,12 @@ func (sc *ServiceCheck) Hash(serviceId string) string {
return fmt.Sprintf("%x", h.Sum(nil))
}
const (
NomadConsulPrefix = "nomad-registered-service"
)
// The Service model represents a Consul service defintion
type Service struct {
Id string // Id of the service, this needs to be unique on a local machine
Name string // Name of the service, defaults to id
Tags []string // List of tags for the service
PortLabel string `mapstructure:"port"` // port for the service
@@ -1157,7 +1206,6 @@ type Service struct {
// InitFields interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) InitFields(job string, taskGroup string, task string) {
s.Id = GenerateUUID()
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
@@ -1167,7 +1215,6 @@ func (s *Service) InitFields(job string, taskGroup string, task string) {
)
for _, check := range s.Checks {
check.Id = check.Hash(s.Id)
if check.Name == "" {
check.Name = fmt.Sprintf("service: %q check", s.Name)
}
@@ -1224,10 +1271,15 @@ type Task struct {
Meta map[string]string
}
// InitAllServiceFields interpolates values of Job, Task Group
// InitFields initializes fields in the task.
func (t *Task) InitFields(job *Job, tg *TaskGroup) {
t.InitServiceFields(job.Name, tg.Name)
}
// InitServiceFields interpolates values of Job, Task Group
// and Tasks in all the service Names of a Task. This also generates the service
// id, check id and check names.
func (t *Task) InitAllServiceFields(job string, taskGroup string) {
func (t *Task) InitServiceFields(job string, taskGroup string) {
for _, service := range t.Services {
service.InitFields(job, taskGroup, t.Name)
}
@@ -1444,6 +1496,9 @@ type Allocation struct {
// task. These should sum to the total Resources.
TaskResources map[string]*Resources
// Services is a map of service names to service ids
Services map[string]string
// Metrics associated with this allocation
Metrics *AllocMetric
@@ -1467,12 +1522,20 @@ type Allocation struct {
ModifyIndex uint64
}
// TerminalStatus returns if the desired status is terminal and
// will no longer transition. This is not based on the current client status.
// TerminalStatus returns if the desired or actual status is terminal and
// will no longer transition.
func (a *Allocation) TerminalStatus() bool {
// First check the desired state and if that isn't terminal, check client
// state.
switch a.DesiredStatus {
case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed:
return true
default:
}
switch a.ClientStatus {
case AllocClientStatusDead, AllocClientStatusFailed:
return true
default:
return false
}
@@ -1497,6 +1560,35 @@ func (a *Allocation) Stub() *AllocListStub {
}
}
// PopulateServiceIDs generates the service IDs for all the service definitions
// in that Allocation
func (a *Allocation) PopulateServiceIDs() {
// Make a copy of the old map which contains the service names and their
// generated IDs
oldIDs := make(map[string]string)
for k, v := range a.Services {
oldIDs[k] = v
}
a.Services = make(map[string]string)
tg := a.Job.LookupTaskGroup(a.TaskGroup)
for _, task := range tg.Tasks {
for _, service := range task.Services {
// If the ID for a service name is already generated then we re-use
// it
if ID, ok := oldIDs[service.Name]; ok {
a.Services[service.Name] = ID
} else {
// If the service hasn't been generated an ID, we generate one.
// We add a prefix to the Service ID so that we can know that this service
// is managed by Nomad since Consul can also have service which are not
// managed by Nomad
a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
}
}
}
}
// AllocListStub is used to return a subset of alloc information
type AllocListStub struct {
ID string
@@ -1624,6 +1716,12 @@ const (
// We periodically scan nodes in a terminal state, and if they have no
// corresponding allocations we delete these out of the system.
CoreJobNodeGC = "node-gc"
// CoreJobJobGC is used for the garbage collection of eligible jobs. We
// periodically scan garbage collectible jobs and check if both their
// evaluations and allocations are terminal. If so, we delete these out of
// the system.
CoreJobJobGC = "job-gc"
)
// Evaluation is used anytime we need to apply business logic as a result

View File

@@ -115,9 +115,11 @@ func TestJob_IsPeriodic(t *testing.T) {
func TestTaskGroup_Validate(t *testing.T) {
tg := &TaskGroup{
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
RestartOnSuccess: true,
Mode: RestartPolicyModeDelay,
},
}
err := tg.Validate()
@@ -141,9 +143,11 @@ func TestTaskGroup_Validate(t *testing.T) {
&Task{},
},
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
RestartOnSuccess: true,
Mode: RestartPolicyModeDelay,
},
}
err = tg.Validate()
@@ -391,13 +395,11 @@ func TestEncodeDecode(t *testing.T) {
func TestInvalidServiceCheck(t *testing.T) {
s := Service{
Id: "service-id",
Name: "service-name",
PortLabel: "bar",
Checks: []*ServiceCheck{
{
Id: "check-id",
Name: "check-name",
Type: "lol",
},
@@ -408,7 +410,7 @@ func TestInvalidServiceCheck(t *testing.T) {
}
}
func TestDistinctCheckId(t *testing.T) {
func TestDistinctCheckID(t *testing.T) {
c1 := ServiceCheck{
Name: "web-health",
Type: "http",
@@ -431,10 +433,10 @@ func TestDistinctCheckId(t *testing.T) {
Interval: 4 * time.Second,
Timeout: 3 * time.Second,
}
serviceId := "123"
c1Hash := c1.Hash(serviceId)
c2Hash := c2.Hash(serviceId)
c3Hash := c3.Hash(serviceId)
serviceID := "123"
c1Hash := c1.Hash(serviceID)
c2Hash := c2.Hash(serviceID)
c3Hash := c3.Hash(serviceID)
if c1Hash == c2Hash || c1Hash == c3Hash || c3Hash == c2Hash {
t.Fatalf("Checks need to be uniq c1: %s, c2: %s, c3: %s", c1Hash, c2Hash, c3Hash)
@@ -442,7 +444,7 @@ func TestDistinctCheckId(t *testing.T) {
}
func TestService_InitFiels(t *testing.T) {
func TestService_InitFields(t *testing.T) {
job := "example"
taskGroup := "cache"
task := "redis"
@@ -455,9 +457,6 @@ func TestService_InitFiels(t *testing.T) {
if s.Name != "redis-db" {
t.Fatalf("Expected name: %v, Actual: %v", "redis-db", s.Name)
}
if s.Id == "" {
t.Fatalf("Expected a GUID for Service ID, Actual: %v", s.Id)
}
s.Name = "db"
s.InitFields(job, taskGroup, task)
@@ -510,7 +509,7 @@ func TestJob_ExpandServiceNames(t *testing.T) {
},
}
j.InitAllServiceFields()
j.InitFields()
service1Name := j.TaskGroups[0].Tasks[0].Services[0].Name
if service1Name != "my-job-web-frontend-default" {

View File

@@ -61,6 +61,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
MemoryMB: 2048,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc2 := &structs.Allocation{
ID: structs.GenerateUUID(),
@@ -72,6 +73,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
MemoryMB: 1024,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))

View File

@@ -281,6 +281,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Set fields based on if we found an allocation option
if option != nil {
// Generate the service ids for the tasks which this allocation is going
// to run
alloc.PopulateServiceIDs()
alloc.NodeID = option.Node.ID
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun

View File

@@ -203,6 +203,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
MemoryMB: 2048,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc2 := &structs.Allocation{
ID: structs.GenerateUUID(),
@@ -214,6 +215,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
MemoryMB: 1024,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
@@ -277,6 +279,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
MemoryMB: 2048,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc2 := &structs.Allocation{
ID: structs.GenerateUUID(),
@@ -288,6 +291,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
MemoryMB: 1024,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
@@ -317,7 +321,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
t.Fatalf("Bad: %v", out[0])
}
if out[1].Score != 18 {
t.Fatalf("Bad: %v", out[0])
t.Fatalf("Bad: %v", out[1])
}
}

View File

@@ -248,6 +248,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// Set fields based on if we found an allocation option
if option != nil {
// Generate the service ids for the tasks that this allocation is going
// to run
alloc.PopulateServiceIDs()
alloc.NodeID = option.Node.ID
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun

View File

@@ -383,6 +383,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
newAlloc.Metrics = ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
newAlloc.ClientStatus = structs.AllocClientStatusPending
newAlloc.PopulateServiceIDs()
ctx.Plan().AppendAlloc(newAlloc)
// Remove this allocation from the slice

View File

@@ -539,11 +539,12 @@ func TestInplaceUpdate_Success(t *testing.T) {
// Register an alloc
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: eval.ID,
NodeID: node.ID,
JobID: job.ID,
Job: job,
ID: structs.GenerateUUID(),
EvalID: eval.ID,
NodeID: node.ID,
JobID: job.ID,
Job: job,
TaskGroup: job.TaskGroups[0].Name,
Resources: &structs.Resources{
CPU: 2048,
MemoryMB: 2048,
@@ -551,13 +552,37 @@ func TestInplaceUpdate_Success(t *testing.T) {
DesiredStatus: structs.AllocDesiredStatusRun,
}
alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources}
alloc.PopulateServiceIDs()
noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc}))
webFeSrvID := alloc.Services["web-frontend"]
adminSrvID := alloc.Services["web-admin"]
if webFeSrvID == "" || adminSrvID == "" {
t.Fatal("Service ID needs to be generated for service")
}
// Create a new task group that updates the resources.
tg := &structs.TaskGroup{}
*tg = *job.TaskGroups[0]
resource := &structs.Resources{CPU: 737}
tg.Tasks[0].Resources = resource
newServices := []*structs.Service{
{
Name: "dummy-service",
PortLabel: "http",
},
{
Name: "dummy-service2",
PortLabel: "http",
},
}
// Delete service 2
tg.Tasks[0].Services = tg.Tasks[0].Services[:1]
// Add the new services
tg.Tasks[0].Services = append(tg.Tasks[0].Services, newServices...)
updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}}
stack := NewGenericStack(false, ctx)
@@ -573,6 +598,23 @@ func TestInplaceUpdate_Success(t *testing.T) {
if len(ctx.plan.NodeAllocation) != 1 {
t.Fatal("inplaceUpdate did not do an inplace update")
}
// Get the alloc we inserted.
a := ctx.plan.NodeAllocation[alloc.NodeID][0]
if len(a.Services) != 3 {
t.Fatalf("Expected number of services: %v, Actual: %v", 3, len(a.Services))
}
// Test that the service id for the old service is still the same
if a.Services["web-frontend"] != webFeSrvID {
t.Fatalf("Expected service ID: %v, Actual: %v", webFeSrvID, a.Services["web-frontend"])
}
// Test that the map doesn't contain the service ID of the admin Service
// anymore
if _, ok := a.Services["web-admin"]; ok {
t.Fatal("Service shouldn't be present")
}
}
func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) {

View File

@@ -44,6 +44,7 @@ gox \
-arch="${XC_ARCH}" \
-osarch="!linux/arm !darwin/386" \
-ldflags "-X main.GitCommit ${GIT_COMMIT}${GIT_DIRTY}" \
-cgo \
-output "pkg/{{.OS}}_{{.Arch}}/nomad" \
.

View File

@@ -3,7 +3,6 @@
# First get the OS-specific packages
GOOS=windows go get $DEP_ARGS github.com/StackExchange/wmi
GOOS=windows go get $DEP_ARGS github.com/shirou/w32
GOOS=linux go get $DEP_ARGS github.com/docker/docker/pkg/units
GOOS=linux go get $DEP_ARGS github.com/docker/docker/pkg/mount
GOOS=linux go get $DEP_ARGS github.com/opencontainers/runc/libcontainer/cgroups/fs
GOOS=linux go get $DEP_ARGS github.com/opencontainers/runc/libcontainer/configs

View File

@@ -5,9 +5,9 @@ var GitCommit string
var GitDescribe string
// The main version number that is being run at the moment.
const Version = "0.3.0"
const Version = "0.2.3"
// A pre-release marker for the version. If this is "" (empty string)
// then it means that it is a final release. Otherwise, this is a pre-release
// such as "dev" (in development), "beta", "rc1", etc.
const VersionPrerelease = "dev"
const VersionPrerelease = ""

View File

@@ -2,7 +2,7 @@ set :base_url, "https://www.nomadproject.io/"
activate :hashicorp do |h|
h.name = "nomad"
h.version = "0.2.1"
h.version = "0.2.3"
h.github_slug = "hashicorp/nomad"
h.minify_javascript = false

View File

@@ -62,7 +62,7 @@ The following options are available for use in the job specification.
* `dns_servers` - (Optional) A list of DNS servers for the container to use
(e.g. ["8.8.8.8", "8.8.4.4"]). *Docker API v1.10 and above only*
* `search_domains` - (Optional) A list of DNS search domains for the container
* `dns_search_domains` - (Optional) A list of DNS search domains for the container
to use.
* `port_map` - (Optional) A key/value map of port labels (see below).

View File

@@ -236,19 +236,28 @@ The `network` object supports the following keys:
The `restart` object supports the following keys:
* `attempts` - For `batch` jobs, `attempts` is the maximum number of restarts
allowed before the task is failed. For non-batch jobs, the `attempts` is the
number of restarts allowed in an `interval` before a restart delay is added.
* `attempts` - `attempts` is the number of restarts allowed in an `interval`.
* `interval` - `interval` is only valid on non-batch jobs and is a time duration
that can be specified using the `s`, `m`, and `h` suffixes, such as `30s`.
The `interval` begins when the first task starts and ensures that only
`attempts` number of restarts happens within it. If more than `attempts`
number of failures happen, the restart is delayed till after the `interval`,
which is then reset.
* `interval` - `interval` is a time duration that can be specified using the
`s`, `m`, and `h` suffixes, such as `30s`. The `interval` begins when the
first task starts and ensures that only `attempts` number of restarts happens
within it. If more than `attempts` number of failures happen, behavior is
controlled by `mode`.
* `delay` - A duration to wait before restarting a task. It is specified as a
time duration using the `s`, `m`, and `h` suffixes, such as `30s`.
time duration using the `s`, `m`, and `h` suffixes, such as `30s`. A random
jitter of up to 25% is added to the the delay.
* `on_success` - `on_success` controls whether a task is restarted when the
task exits successfully.
* `mode` - Controls the behavior when the task fails more than `attempts`
times in an interval. Possible values are listed below:
* `delay` - `delay` will delay the next restart until the next `interval` is
reached.
* `fail` - `fail` will not restart the task again.
The default `batch` restart policy is:
@@ -256,6 +265,9 @@ The default `batch` restart policy is:
restart {
attempts = 15
delay = "15s"
interval = "168h" # 7 days
on_success = false
mode = "delay"
}
```
@@ -266,6 +278,8 @@ restart {
interval = "1m"
attempts = 2
delay = "15s"
on_success = true
mode = "delay"
}
```