Merge pull request #4259 from hashicorp/f-deployment-improvements

This commit is contained in:
Preetha
2018-05-08 16:37:10 -05:00
committed by GitHub
68 changed files with 4674 additions and 1687 deletions

View File

@@ -14,7 +14,6 @@ import (
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
@@ -603,11 +602,11 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// serviceRegs creates service registrations, check registrations, and script
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *structs.Service,
task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) (*ServiceRegistration, error) {
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, task *TaskServices) (
*ServiceRegistration, error) {
// Get the services ID
id := makeTaskServiceID(allocID, task.Name, service)
id := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
@@ -620,26 +619,33 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
}
// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, task.Resources.Networks, net)
ip, port, err := getAddress(addrMode, service.PortLabel, task.Networks, task.DriverNetwork)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
// Determine whether to use tags or canary_tags
var tags []string
if task.Canary {
tags = make([]string, len(service.CanaryTags))
copy(tags, service.CanaryTags)
} else {
tags = make([]string, len(service.Tags))
copy(tags, service.Tags)
}
// Build the Consul Service registration request
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: make([]string, len(service.Tags)),
Tags: tags,
Address: ip,
Port: port,
}
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy(serviceReg.Tags, service.Tags)
ops.regServices = append(ops.regServices, serviceReg)
// Build the check registrations
checkIDs, err := c.checkRegs(ops, allocID, id, service, task, exec, net)
checkIDs, err := c.checkRegs(ops, id, service, task)
if err != nil {
return nil, err
}
@@ -651,8 +657,8 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
// checkRegs registers the checks for the given service and returns the
// registered check ids.
func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, service *structs.Service,
task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) ([]string, error) {
func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *structs.Service,
task *TaskServices) ([]string, error) {
// Fast path
numChecks := len(service.Checks)
@@ -665,11 +671,13 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se
checkID := makeCheckID(serviceID, check)
checkIDs = append(checkIDs, checkID)
if check.Type == structs.ServiceCheckScript {
if exec == nil {
if task.DriverExec == nil {
return nil, fmt.Errorf("driver doesn't support script checks")
}
ops.scripts = append(ops.scripts, newScriptCheck(
allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh))
sc := newScriptCheck(task.AllocID, task.Name, checkID, check, task.DriverExec,
c.client, c.logger, c.shutdownCh)
ops.scripts = append(ops.scripts, sc)
// Skip getAddress for script checks
checkReg, err := createCheckReg(serviceID, checkID, check, "", 0)
@@ -693,7 +701,7 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se
addrMode = structs.AddressModeHost
}
ip, port, err := getAddress(addrMode, portLabel, task.Resources.Networks, net)
ip, port, err := getAddress(addrMode, portLabel, task.Networks, task.DriverNetwork)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
@@ -714,7 +722,7 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se
// Checks will always use the IP from the Task struct (host's IP).
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (c *ServiceClient) RegisterTask(task *TaskServices) error {
// Fast path
numServices := len(task.Services)
if numServices == 0 {
@@ -726,7 +734,7 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart
ops := &operations{}
for _, service := range task.Services {
sreg, err := c.serviceRegs(ops, allocID, service, task, exec, net)
sreg, err := c.serviceRegs(ops, service, task)
if err != nil {
return err
}
@@ -734,18 +742,18 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart
}
// Add the task to the allocation's registration
c.addTaskRegistration(allocID, task.Name, t)
c.addTaskRegistration(task.AllocID, task.Name, t)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range task.Services {
serviceID := makeTaskServiceID(allocID, task.Name, service)
serviceID := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
c.checkWatcher.Watch(allocID, task.Name, checkID, check, restarter)
c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter)
}
}
}
@@ -756,19 +764,19 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart
// changed.
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
ops := &operations{}
taskReg := new(TaskRegistration)
taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
existingIDs := make(map[string]*structs.Service, len(existing.Services))
for _, s := range existing.Services {
existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s
existingIDs := make(map[string]*structs.Service, len(old.Services))
for _, s := range old.Services {
existingIDs[makeTaskServiceID(old.AllocID, old.Name, s, old.Canary)] = s
}
newIDs := make(map[string]*structs.Service, len(newTask.Services))
for _, s := range newTask.Services {
newIDs[makeTaskServiceID(allocID, newTask.Name, s)] = s
newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s
}
// Loop over existing Service IDs to see if they have been removed or
@@ -816,7 +824,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
}
// New check on an unchanged service; add them now
newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net)
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newTask)
if err != nil {
return err
}
@@ -828,7 +836,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
// Update all watched checks as CheckRestart fields aren't part of ID
if check.TriggersRestarts() {
c.checkWatcher.Watch(allocID, newTask.Name, checkID, check, restarter)
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
}
}
@@ -845,7 +853,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
// Any remaining services should just be enqueued directly
for _, newSvc := range newIDs {
sreg, err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net)
sreg, err := c.serviceRegs(ops, newSvc, newTask)
if err != nil {
return err
}
@@ -854,18 +862,18 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
}
// Add the task to the allocation's registration
c.addTaskRegistration(allocID, newTask.Name, taskReg)
c.addTaskRegistration(newTask.AllocID, newTask.Name, taskReg)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range newIDs {
serviceID := makeTaskServiceID(allocID, newTask.Name, service)
serviceID := makeTaskServiceID(newTask.AllocID, newTask.Name, service, newTask.Canary)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
c.checkWatcher.Watch(allocID, newTask.Name, checkID, check, restarter)
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
}
}
}
@@ -875,11 +883,11 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
// RemoveTask from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
func (c *ServiceClient) RemoveTask(task *TaskServices) {
ops := operations{}
for _, service := range task.Services {
id := makeTaskServiceID(allocID, task.Name, service)
id := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
@@ -893,7 +901,7 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
}
// Remove the task from the alloc's registrations
c.removeTaskRegistration(allocID, task.Name)
c.removeTaskRegistration(task.AllocID, task.Name)
// Now add them to the deregistration fields; main Run loop will update
c.commit(&ops)
@@ -1037,7 +1045,7 @@ func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
// Example Client ID: _nomad-client-ggnjpgl7yn7rgmvxzilmpvrzzvrszc7l
//
func makeAgentServiceID(role string, service *structs.Service) string {
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, ""))
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
}
// makeTaskServiceID creates a unique ID for identifying a task service in
@@ -1045,8 +1053,8 @@ func makeAgentServiceID(role string, service *structs.Service) string {
// Checks. This allows updates to merely compare IDs.
//
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
func makeTaskServiceID(allocID, taskName string, service *structs.Service) string {
return nomadTaskPrefix + service.Hash(allocID, taskName)
func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return nomadTaskPrefix + service.Hash(allocID, taskName, canary)
}
// makeCheckID creates a unique ID for a check.

View File

@@ -0,0 +1,67 @@
package consul
import (
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
type TaskServices struct {
AllocID string
// Name of the task
Name string
// Canary indicates whether or not the allocation is a canary
Canary bool
// Restarter allows restarting the task depending on the task's
// check_restart stanzas.
Restarter TaskRestarter
// Services and checks to register for the task.
Services []*structs.Service
// Networks from the task's resources stanza.
Networks structs.Networks
// DriverExec is the script executor for the task's driver.
DriverExec driver.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *cstructs.DriverNetwork
}
func NewTaskServices(alloc *structs.Allocation, task *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) *TaskServices {
ts := TaskServices{
AllocID: alloc.ID,
Name: task.Name,
Restarter: restarter,
Services: task.Services,
DriverExec: exec,
DriverNetwork: net,
}
if task.Resources != nil {
ts.Networks = task.Resources.Networks
}
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
ts.Canary = true
}
return &ts
}
// Copy method for easing tests
func (t *TaskServices) Copy() *TaskServices {
newTS := new(TaskServices)
*newTS = *t
// Deep copy Services
newTS.Services = make([]*structs.Service, len(t.Services))
for i := range t.Services {
newTS.Services[i] = t.Services[i].Copy()
}
return newTS
}

View File

@@ -11,7 +11,9 @@ import (
"github.com/hashicorp/consul/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
@@ -25,19 +27,11 @@ const (
yPort = 1235
)
func testTask() *structs.Task {
return &structs.Task{
Name: "taskname",
Resources: &structs.Resources{
Networks: []*structs.NetworkResource{
{
DynamicPorts: []structs.Port{
{Label: "x", Value: xPort},
{Label: "y", Value: yPort},
},
},
},
},
func testTask() *TaskServices {
return &TaskServices{
AllocID: uuid.Generate(),
Name: "taskname",
Restarter: &restartRecorder{},
Services: []*structs.Service{
{
Name: "taskname-service",
@@ -45,9 +39,46 @@ func testTask() *structs.Task {
Tags: []string{"tag1", "tag2"},
},
},
Networks: []*structs.NetworkResource{
{
DynamicPorts: []structs.Port{
{Label: "x", Value: xPort},
{Label: "y", Value: yPort},
},
},
},
DriverExec: newMockExec(),
}
}
// mockExec implements the ScriptExecutor interface and will use an alternate
// implementation t.ExecFunc if non-nil.
type mockExec struct {
// Ticked whenever a script is called
execs chan int
// If non-nil will be called by script checks
ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
func newMockExec() *mockExec {
return &mockExec{
execs: make(chan int, 100),
}
}
func (m *mockExec) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
select {
case m.execs <- 1:
default:
}
if m.ExecFunc == nil {
// Default impl is just "ok"
return []byte("ok"), 0, nil
}
return m.ExecFunc(ctx, cmd, args)
}
// restartRecorder is a minimal TaskRestarter implementation that simply
// counts how many restarts were triggered.
type restartRecorder struct {
@@ -58,33 +89,12 @@ func (r *restartRecorder) Restart(source, reason string, failure bool) {
atomic.AddInt64(&r.restarts, 1)
}
// testFakeCtx contains a fake Consul AgentAPI and implements the Exec
// interface to allow testing without running Consul.
// testFakeCtx contains a fake Consul AgentAPI
type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *MockAgent
Task *structs.Task
Restarter *restartRecorder
// Ticked whenever a script is called
execs chan int
// If non-nil will be called by script checks
ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
// Exec implements the ScriptExecutor interface and will use an alternate
// implementation t.ExecFunc if non-nil.
func (t *testFakeCtx) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
select {
case t.execs <- 1:
default:
}
if t.ExecFunc == nil {
// Default impl is just "ok"
return []byte("ok"), 0, nil
}
return t.ExecFunc(ctx, cmd, args)
Task *TaskServices
MockExec *mockExec
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
@@ -105,20 +115,19 @@ func (t *testFakeCtx) syncOnce() error {
// A test Task is also provided.
func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
tt := testTask()
return &testFakeCtx{
ServiceClient: NewServiceClient(fc, testlog.Logger(t)),
FakeConsul: fc,
Task: testTask(),
Restarter: &restartRecorder{},
execs: make(chan int, 100),
Task: tt,
MockExec: tt.DriverExec.(*mockExec),
}
}
func TestConsul_ChangeTags(t *testing.T) {
ctx := setupFake(t)
allocID := "allocid"
if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, nil, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -132,7 +141,7 @@ func TestConsul_ChangeTags(t *testing.T) {
// Query the allocs registrations and then again when we update. The IDs
// should change
reg1, err := ctx.ServiceClient.AllocRegistrations(allocID)
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
@@ -157,10 +166,9 @@ func TestConsul_ChangeTags(t *testing.T) {
}
}
origTask := ctx.Task
ctx.Task = testTask()
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Tags[0] = "newtag"
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, nil, nil); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
@@ -184,7 +192,7 @@ func TestConsul_ChangeTags(t *testing.T) {
}
// Check again and ensure the IDs changed
reg2, err := ctx.ServiceClient.AllocRegistrations(allocID)
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
@@ -242,7 +250,7 @@ func TestConsul_ChangePorts(t *testing.T) {
},
}
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -285,8 +293,8 @@ func TestConsul_ChangePorts(t *testing.T) {
case "c2":
origScriptKey = k
select {
case <-ctx.execs:
if n := len(ctx.execs); n > 0 {
case <-ctx.MockExec.execs:
if n := len(ctx.MockExec.execs); n > 0 {
t.Errorf("expected 1 exec but found: %d", n+1)
}
case <-time.After(3 * time.Second):
@@ -303,8 +311,7 @@ func TestConsul_ChangePorts(t *testing.T) {
}
// Now update the PortLabel on the Service and Check c3
origTask := ctx.Task
ctx.Task = testTask()
origTask := ctx.Task.Copy()
ctx.Task.Services[0].PortLabel = "y"
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
@@ -330,7 +337,7 @@ func TestConsul_ChangePorts(t *testing.T) {
// Removed PortLabel; should default to service's (y)
},
}
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, ctx, nil); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
@@ -374,8 +381,8 @@ func TestConsul_ChangePorts(t *testing.T) {
t.Errorf("expected key change for %s from %q", v.Name, origScriptKey)
}
select {
case <-ctx.execs:
if n := len(ctx.execs); n > 0 {
case <-ctx.MockExec.execs:
if n := len(ctx.MockExec.execs); n > 0 {
t.Errorf("expected 1 exec but found: %d", n+1)
}
case <-time.After(3 * time.Second):
@@ -411,8 +418,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
},
}
allocID := "allocid"
if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -433,7 +439,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
// Query the allocs registrations and then again when we update. The IDs
// should change
reg1, err := ctx.ServiceClient.AllocRegistrations(allocID)
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
@@ -489,7 +495,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, ctx, nil); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -547,7 +553,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
}
// Check again and ensure the IDs changed
reg2, err := ctx.ServiceClient.AllocRegistrations(allocID)
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
@@ -603,7 +609,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, ctx, nil); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
@@ -646,7 +652,7 @@ func TestConsul_RegServices(t *testing.T) {
},
}
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, nil, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -677,7 +683,7 @@ func TestConsul_RegServices(t *testing.T) {
// Assert the check update is properly formed
checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd.checkRestart.allocID != "allocid" {
if checkUpd.checkRestart.allocID != ctx.Task.AllocID {
t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID)
}
if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected {
@@ -687,7 +693,7 @@ func TestConsul_RegServices(t *testing.T) {
// Make a change which will register a new service
ctx.Task.Services[0].Name = "taskname-service2"
ctx.Task.Services[0].Tags[0] = "tag3"
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, nil, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -737,7 +743,7 @@ func TestConsul_RegServices(t *testing.T) {
}
// Remove the new task
ctx.ServiceClient.RemoveTask("allocid", ctx.Task)
ctx.ServiceClient.RemoveTask(ctx.Task)
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
@@ -787,7 +793,7 @@ func TestConsul_ShutdownOK(t *testing.T) {
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -849,7 +855,7 @@ func TestConsul_ShutdownSlow(t *testing.T) {
// Make Exec slow, but not too slow
waiter := make(chan struct{})
ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
select {
case <-waiter:
default:
@@ -865,7 +871,7 @@ func TestConsul_ShutdownSlow(t *testing.T) {
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -924,7 +930,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
// Make Exec block forever
waiter := make(chan struct{})
ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
close(waiter)
<-block
return []byte{}, 0, nil
@@ -936,7 +942,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -988,7 +994,7 @@ func TestConsul_CancelScript(t *testing.T) {
},
}
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -1007,7 +1013,7 @@ func TestConsul_CancelScript(t *testing.T) {
for i := 0; i < 2; i++ {
select {
case <-ctx.execs:
case <-ctx.MockExec.execs:
// Script ran as expected!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to run")
@@ -1025,7 +1031,7 @@ func TestConsul_CancelScript(t *testing.T) {
},
}
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx.Restarter, ctx, nil); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -1044,7 +1050,7 @@ func TestConsul_CancelScript(t *testing.T) {
// Make sure exec wasn't called again
select {
case <-ctx.execs:
case <-ctx.MockExec.execs:
t.Errorf("unexpected execution of script; was goroutine not cancelled?")
case <-time.After(100 * time.Millisecond):
// No unexpected script execs
@@ -1104,7 +1110,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
},
}
net := &cstructs.DriverNetwork{
ctx.Task.DriverNetwork = &cstructs.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
@@ -1113,7 +1119,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
AutoAdvertise: true,
}
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, net); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -1129,9 +1135,9 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
switch v.Name {
case ctx.Task.Services[0].Name: // x
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
if v.Port != net.PortMap["x"] {
if v.Port != ctx.Task.DriverNetwork.PortMap["x"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, net.PortMap["x"], v.Port)
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
}
// The order of checks in Consul is not guaranteed to
// be the same as their order in the Task definition,
@@ -1159,13 +1165,13 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
}
case ctx.Task.Services[1].Name: // y
// Service should be container ip:port
if v.Address != net.IP {
if v.Address != ctx.Task.DriverNetwork.IP {
t.Errorf("expected service %s's address to be %s but found %s",
v.Name, net.IP, v.Address)
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
}
if v.Port != net.PortMap["y"] {
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, net.PortMap["x"], v.Port)
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
}
// Check should be host ip:port
if v.Checks[0].TCP != ":1235" { // yPort
@@ -1208,7 +1214,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
},
}
net := &cstructs.DriverNetwork{
ctx.Task.DriverNetwork = &cstructs.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
@@ -1217,7 +1223,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
AutoAdvertise: false,
}
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, net); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -1239,13 +1245,13 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
}
case ctx.Task.Services[1].Name: // y + driver mode
// Service should be container ip:port
if v.Address != net.IP {
if v.Address != ctx.Task.DriverNetwork.IP {
t.Errorf("expected service %s's address to be %s but found %s",
v.Name, net.IP, v.Address)
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
}
if v.Port != net.PortMap["y"] {
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, net.PortMap["x"], v.Port)
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
}
case ctx.Task.Services[2].Name: // y + host mode
if v.Port != yPort {
@@ -1272,7 +1278,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
},
}
net := &cstructs.DriverNetwork{
ctx.Task.DriverNetwork = &cstructs.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
@@ -1304,31 +1310,63 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
}
// Initial service should advertise host port x
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, net); err != nil {
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
syncAndAssertPort(xPort)
// UpdateTask to use Host (shouldn't change anything)
orig := ctx.Task.Copy()
origTask := ctx.Task.Copy()
ctx.Task.Services[0].AddressMode = structs.AddressModeHost
if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx.Restarter, ctx, net); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error updating task: %v", err)
}
syncAndAssertPort(xPort)
// UpdateTask to use Driver (*should* change IP and port)
orig = ctx.Task.Copy()
origTask = ctx.Task.Copy()
ctx.Task.Services[0].AddressMode = structs.AddressModeDriver
if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx.Restarter, ctx, net); err != nil {
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error updating task: %v", err)
}
syncAndAssertPort(net.PortMap["x"])
syncAndAssertPort(ctx.Task.DriverNetwork.PortMap["x"])
}
// TestConsul_CanaryTags asserts CanaryTags are used when Canary=true
func TestConsul_CanaryTags(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := setupFake(t)
canaryTags := []string{"tag1", "canary"}
ctx.Task.Canary = true
ctx.Task.Services[0].CanaryTags = canaryTags
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Equal(canaryTags, service.Tags)
}
// Disable canary and assert tags are not the canary tags
origTask := ctx.Task.Copy()
ctx.Task.Canary = false
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.NotEqual(canaryTags, service.Tags)
}
ctx.ServiceClient.RemoveTask(ctx.Task)
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 0)
}
// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with

View File

@@ -693,13 +693,14 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
if taskGroup.Update != nil {
tg.Update = &structs.UpdateStrategy{
Stagger: *taskGroup.Update.Stagger,
MaxParallel: *taskGroup.Update.MaxParallel,
HealthCheck: *taskGroup.Update.HealthCheck,
MinHealthyTime: *taskGroup.Update.MinHealthyTime,
HealthyDeadline: *taskGroup.Update.HealthyDeadline,
AutoRevert: *taskGroup.Update.AutoRevert,
Canary: *taskGroup.Update.Canary,
Stagger: *taskGroup.Update.Stagger,
MaxParallel: *taskGroup.Update.MaxParallel,
HealthCheck: *taskGroup.Update.HealthCheck,
MinHealthyTime: *taskGroup.Update.MinHealthyTime,
HealthyDeadline: *taskGroup.Update.HealthyDeadline,
ProgressDeadline: *taskGroup.Update.ProgressDeadline,
AutoRevert: *taskGroup.Update.AutoRevert,
Canary: *taskGroup.Update.Canary,
}
}
@@ -743,6 +744,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
Name: service.Name,
PortLabel: service.PortLabel,
Tags: service.Tags,
CanaryTags: service.CanaryTags,
AddressMode: service.AddressMode,
}

View File

@@ -1161,13 +1161,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
Update: &api.UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
MaxParallel: helper.IntToPtr(5),
HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Manual),
MinHealthyTime: helper.TimeToPtr(1 * time.Minute),
HealthyDeadline: helper.TimeToPtr(3 * time.Minute),
AutoRevert: helper.BoolToPtr(false),
Canary: helper.IntToPtr(1),
Stagger: helper.TimeToPtr(1 * time.Second),
MaxParallel: helper.IntToPtr(5),
HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Manual),
MinHealthyTime: helper.TimeToPtr(1 * time.Minute),
HealthyDeadline: helper.TimeToPtr(3 * time.Minute),
ProgressDeadline: helper.TimeToPtr(3 * time.Minute),
AutoRevert: helper.BoolToPtr(false),
Canary: helper.IntToPtr(1),
},
Periodic: &api.PeriodicConfig{
Enabled: helper.BoolToPtr(true),
@@ -1222,10 +1223,11 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Migrate: helper.BoolToPtr(true),
},
Update: &api.UpdateStrategy{
HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Checks),
MinHealthyTime: helper.TimeToPtr(2 * time.Minute),
HealthyDeadline: helper.TimeToPtr(5 * time.Minute),
AutoRevert: helper.BoolToPtr(true),
HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Checks),
MinHealthyTime: helper.TimeToPtr(2 * time.Minute),
HealthyDeadline: helper.TimeToPtr(5 * time.Minute),
ProgressDeadline: helper.TimeToPtr(5 * time.Minute),
AutoRevert: helper.BoolToPtr(true),
},
Meta: map[string]string{
@@ -1253,10 +1255,11 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Services: []*api.Service{
{
Id: "id",
Name: "serviceA",
Tags: []string{"1", "2"},
PortLabel: "foo",
Id: "id",
Name: "serviceA",
Tags: []string{"1", "2"},
CanaryTags: []string{"3", "4"},
PortLabel: "foo",
CheckRestart: &api.CheckRestart{
Limit: 4,
Grace: helper.TimeToPtr(11 * time.Second),
@@ -1446,13 +1449,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Migrate: true,
},
Update: &structs.UpdateStrategy{
Stagger: 1 * time.Second,
MaxParallel: 5,
HealthCheck: structs.UpdateStrategyHealthCheck_Checks,
MinHealthyTime: 2 * time.Minute,
HealthyDeadline: 5 * time.Minute,
AutoRevert: true,
Canary: 1,
Stagger: 1 * time.Second,
MaxParallel: 5,
HealthCheck: structs.UpdateStrategyHealthCheck_Checks,
MinHealthyTime: 2 * time.Minute,
HealthyDeadline: 5 * time.Minute,
ProgressDeadline: 5 * time.Minute,
AutoRevert: true,
Canary: 1,
},
Meta: map[string]string{
"key": "value",
@@ -1480,6 +1484,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
{
Name: "serviceA",
Tags: []string{"1", "2"},
CanaryTags: []string{"3", "4"},
PortLabel: "foo",
AddressMode: "auto",
Checks: []*structs.ServiceCheck{

View File

@@ -246,34 +246,22 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength
if alloc.DeploymentID != "" {
health := "unset"
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil {
if *alloc.DeploymentStatus.Healthy {
health = "healthy"
} else {
health = "unhealthy"
canary := false
if alloc.DeploymentStatus != nil {
if alloc.DeploymentStatus.Healthy != nil {
if *alloc.DeploymentStatus.Healthy {
health = "healthy"
} else {
health = "unhealthy"
}
}
canary = alloc.DeploymentStatus.Canary
}
basic = append(basic,
fmt.Sprintf("Deployment ID|%s", limit(alloc.DeploymentID, uuidLength)),
fmt.Sprintf("Deployment Health|%s", health))
// Check if this allocation is a canary
deployment, _, err := client.Deployments().Info(alloc.DeploymentID, nil)
if err != nil {
return "", fmt.Errorf("Error querying deployment %q: %s", alloc.DeploymentID, err)
}
canary := false
if state, ok := deployment.TaskGroups[alloc.TaskGroup]; ok {
for _, id := range state.PlacedCanaries {
if id == alloc.ID {
canary = true
break
}
}
}
if canary {
basic = append(basic, fmt.Sprintf("Canary|%v", true))
}

View File

@@ -195,7 +195,7 @@ func formatDeployment(d *api.Deployment, uuidLength int) string {
func formatDeploymentGroups(d *api.Deployment, uuidLength int) string {
// Detect if we need to add these columns
canaries, autorevert := false, false
var canaries, autorevert, progressDeadline bool
tgNames := make([]string, 0, len(d.TaskGroups))
for name, state := range d.TaskGroups {
tgNames = append(tgNames, name)
@@ -205,6 +205,9 @@ func formatDeploymentGroups(d *api.Deployment, uuidLength int) string {
if state.DesiredCanaries > 0 {
canaries = true
}
if state.ProgressDeadline != 0 {
progressDeadline = true
}
}
// Sort the task group names to get a reliable ordering
@@ -223,6 +226,9 @@ func formatDeploymentGroups(d *api.Deployment, uuidLength int) string {
rowString += "Canaries|"
}
rowString += "Placed|Healthy|Unhealthy"
if progressDeadline {
rowString += "|Progress Deadline"
}
rows := make([]string, len(d.TaskGroups)+1)
rows[0] = rowString
@@ -245,6 +251,13 @@ func formatDeploymentGroups(d *api.Deployment, uuidLength int) string {
row += fmt.Sprintf("%d|", state.DesiredCanaries)
}
row += fmt.Sprintf("%d|%d|%d", state.PlacedAllocs, state.HealthyAllocs, state.UnhealthyAllocs)
if progressDeadline {
if state.RequireProgressBy.IsZero() {
row += fmt.Sprintf("|%v", "N/A")
} else {
row += fmt.Sprintf("|%v", formatTime(state.RequireProgressBy))
}
}
rows[i] = row
i++
}