consul: plubming for specifying consul namespace in job/group

This PR adds the common OSS changes for adding support for Consul Namespaces,
which is going to be a Nomad Enterprise feature. There is no new functionality
provided by this changeset and hopefully no new bugs.
This commit is contained in:
Seth Hoenig
2021-03-16 13:22:21 -05:00
parent 5c3399853d
commit a97254fa20
73 changed files with 2078 additions and 529 deletions

View File

@@ -1158,7 +1158,8 @@ func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
}
// Create Consul Agent client for looking info about the agent.
consulAgentClient := consulClient.Agent()
a.consulService = consul.NewServiceClient(consulAgentClient, a.logger, isClient)
namespacesClient := consul.NewNamespacesClient(consulClient.Namespaces())
a.consulService = consul.NewServiceClient(consulAgentClient, namespacesClient, a.logger, isClient)
a.consulProxies = consul.NewConnectProxiesClient(consulAgentClient)
// Run the Consul service client's sync'ing main loop

View File

@@ -40,6 +40,7 @@ const (
ExamplePolicyID1 = "a7c86856-0af5-4ab5-8834-03f4517e5564"
ExamplePolicyID2 = "ffa1b66c-967d-4468-8775-c687b5cfc16e"
ExamplePolicyID3 = "f68f0c36-51f8-4343-97dd-f0d4816c915f"
ExamplePolicyID4 = "1087ff34-b8a0-9bb3-9430-d2f758f52bd3"
)
func (m *MockACLsAPI) PolicyRead(policyID string, _ *api.QueryOptions) (*api.ACLPolicy, *api.QueryMeta, error) {
@@ -66,6 +67,12 @@ service "service1" { policy = "read" }
service "service2" { policy = "write" }`,
}, nil, nil
case ExamplePolicyID4:
return &api.ACLPolicy{
ID: ExamplePolicyID4,
Rules: `key_prefix "" { policy = "read" }`,
}, nil, nil
default:
return nil, nil, errors.New("no such policy")
}
@@ -120,6 +127,7 @@ const (
ExampleOperatorTokenID2 = "868cc216-e123-4c2b-b362-f4d4c087de8e"
ExampleOperatorTokenID3 = "6177d1b9-c0f6-4118-b891-d818a3cb80b1"
ExampleOperatorTokenID4 = "754ae26c-f3cc-e088-d486-9c0d20f5eaea"
ExampleOperatorTokenID5 = "097cbb45-506b-c79c-ec38-82eb0dc0794a"
)
var (
@@ -127,6 +135,7 @@ var (
SecretID: ExampleOperatorTokenID0,
AccessorID: "228865c6-3bf6-6683-df03-06dea2779088 ",
Description: "Operator Token 0",
Namespace: "default",
}
ExampleOperatorToken1 = &api.ACLToken{
@@ -136,6 +145,7 @@ var (
Policies: []*api.ACLTokenPolicyLink{{
ID: ExamplePolicyID1,
}},
Namespace: "default",
}
ExampleOperatorToken2 = &api.ACLToken{
@@ -145,6 +155,7 @@ var (
Policies: []*api.ACLTokenPolicyLink{{
ID: ExamplePolicyID2,
}},
Namespace: "default",
}
ExampleOperatorToken3 = &api.ACLToken{
@@ -154,6 +165,7 @@ var (
Policies: []*api.ACLTokenPolicyLink{{
ID: ExamplePolicyID3,
}},
Namespace: "default",
}
ExampleOperatorToken4 = &api.ACLToken{
@@ -165,6 +177,17 @@ var (
ID: ExampleRoleID1,
Name: "example-role-1",
}},
Namespace: "default",
}
ExampleOperatorToken5 = &api.ACLToken{
SecretID: ExampleOperatorTokenID5,
AccessorID: "cf39aad5-00c3-af23-cf0b-75d41e12f28d",
Description: "Operator Token 5",
Policies: []*api.ACLTokenPolicyLink{{
ID: ExamplePolicyID4,
}},
Namespace: "default",
}
)
@@ -183,6 +206,9 @@ func (m *MockACLsAPI) TokenReadSelf(q *api.QueryOptions) (*api.ACLToken, *api.Qu
case ExampleOperatorTokenID4:
return ExampleOperatorToken4, nil, nil
case ExampleOperatorTokenID5:
return ExampleOperatorToken5, nil, nil
default:
return nil, nil, errors.New("no such token")
}
@@ -253,6 +279,7 @@ func (m *MockACLsAPI) tokenCreate(token *api.ACLToken, _ *api.WriteOptions) (uin
SecretID: uuid.Generate(),
Description: token.Description,
ServiceIdentities: token.ServiceIdentities,
Namespace: token.Namespace,
CreateTime: time.Now(),
}

View File

@@ -2,6 +2,7 @@ package consul
import (
"fmt"
"sort"
"sync"
"github.com/hashicorp/consul/api"
@@ -9,11 +10,48 @@ import (
"github.com/hashicorp/nomad/helper"
)
// MockNamespaces is a mock implementation of NamespaceAPI.
type MockNamespaces struct {
namespaces []*api.Namespace
}
var _ NamespaceAPI = (*MockNamespaces)(nil)
// NewMockNamespaces creates a MockNamespaces with the given namespaces, and
// will automatically add the "default" namespace if not included.
func NewMockNamespaces(namespaces []string) *MockNamespaces {
list := helper.CopySliceString(namespaces)
if !helper.SliceStringContains(list, "default") {
list = append(list, "default")
}
sort.Strings(list)
data := make([]*api.Namespace, 0, len(list))
for _, namespace := range list {
data = append(data, &api.Namespace{
Name: namespace,
})
}
return &MockNamespaces{
namespaces: data,
}
}
// List implements NamespaceAPI
func (m *MockNamespaces) List(*api.QueryOptions) ([]*api.Namespace, *api.QueryMeta, error) {
result := make([]*api.Namespace, len(m.namespaces))
copy(result, m.namespaces)
return result, new(api.QueryMeta), nil
}
// MockCatalog can be used for testing where the CatalogAPI is needed.
type MockCatalog struct {
logger hclog.Logger
}
var _ CatalogAPI = (*MockCatalog)(nil)
func NewMockCatalog(l hclog.Logger) *MockCatalog {
return &MockCatalog{logger: l.Named("mock_consul")}
}
@@ -31,9 +69,11 @@ func (m *MockCatalog) Service(service, tag string, q *api.QueryOptions) ([]*api.
// MockAgent is a fake in-memory Consul backend for ServiceClient.
type MockAgent struct {
// maps of what services and checks have been registered
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
// services tracks what services have been registered, per namespace
services map[string]map[string]*api.AgentServiceRegistration
// checks tracks what checks have been registered, per namespace
checks map[string]map[string]*api.AgentCheckRegistration
// hits is the total number of times agent methods have been called
hits int
@@ -41,19 +81,21 @@ type MockAgent struct {
// mu guards above fields
mu sync.Mutex
// when UpdateTTL is called the check ID will have its counter inc'd
checkTTLs map[string]int
// checkTTLS counts calls to UpdateTTL for each check, per namespace
checkTTLs map[string]map[string]int
// What check status to return from Checks()
checkStatus string
}
var _ AgentAPI = (*MockAgent)(nil)
// NewMockAgent that returns all checks as passing.
func NewMockAgent() *MockAgent {
return &MockAgent{
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
checkTTLs: make(map[string]int),
services: make(map[string]map[string]*api.AgentServiceRegistration),
checks: make(map[string]map[string]*api.AgentCheckRegistration),
checkTTLs: make(map[string]map[string]int),
checkStatus: api.HealthPassing,
}
}
@@ -109,13 +151,23 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
return s, nil
}
func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
func getNamespace(q *api.QueryOptions) string {
if q == nil || q.Namespace == "" {
return "default"
}
return q.Namespace
}
// ServicesWithFilterOpts implements AgentAPI
func (c *MockAgent) ServicesWithFilterOpts(_ string, q *api.QueryOptions) (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
namespace := getNamespace(q)
r := make(map[string]*api.AgentService, len(c.services))
for k, v := range c.services {
for k, v := range c.services[namespace] {
r[k] = &api.AgentService{
ID: v.ID,
Service: v.Name,
@@ -130,104 +182,152 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
return r, nil
}
// Checks implements the Agent API Checks method.
func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
// ChecksWithFilterOpts implements AgentAPI
func (c *MockAgent) ChecksWithFilterOpts(_ string, q *api.QueryOptions) (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
namespace := getNamespace(q)
r := make(map[string]*api.AgentCheck, len(c.checks))
for k, v := range c.checks {
for k, v := range c.checks[namespace] {
r[k] = &api.AgentCheck{
CheckID: v.ID,
Name: v.Name,
Status: c.checkStatus,
Notes: v.Notes,
ServiceID: v.ServiceID,
ServiceName: c.services[v.ServiceID].Name,
ServiceName: c.services[namespace][v.ServiceID].Name,
}
}
return r, nil
}
// CheckRegs returns the raw AgentCheckRegistrations registered with this mock
// agent.
// agent, across all namespaces.
func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
c.mu.Lock()
defer c.mu.Unlock()
regs := make([]*api.AgentCheckRegistration, 0, len(c.checks))
for _, check := range c.checks {
regs = append(regs, check)
for namespace := range c.checks {
for _, check := range c.checks[namespace] {
regs = append(regs, check)
}
}
return regs
}
// CheckRegister implements AgentAPI
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.checks[check.ID] = check
// Consul will set empty Namespace to default, do the same here
if check.Namespace == "" {
check.Namespace = "default"
}
if c.checks[check.Namespace] == nil {
c.checks[check.Namespace] = make(map[string]*api.AgentCheckRegistration)
}
c.checks[check.Namespace][check.ID] = check
// Be nice and make checks reachable-by-service
scheck := check.AgentServiceCheck
c.services[check.ServiceID].Checks = append(c.services[check.ServiceID].Checks, &scheck)
serviceCheck := check.AgentServiceCheck
if c.services[check.Namespace] == nil {
c.services[check.Namespace] = make(map[string]*api.AgentServiceRegistration)
}
c.services[check.Namespace][check.ServiceID].Checks = append(c.services[check.Namespace][check.ServiceID].Checks, &serviceCheck)
return nil
}
func (c *MockAgent) CheckDeregister(checkID string) error {
// CheckDeregisterOpts implements AgentAPI
func (c *MockAgent) CheckDeregisterOpts(checkID string, q *api.QueryOptions) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.checks, checkID)
delete(c.checkTTLs, checkID)
namespace := getNamespace(q)
delete(c.checks[namespace], checkID)
delete(c.checkTTLs[namespace], checkID)
return nil
}
// ServiceRegister implements AgentAPI
func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.services[service.ID] = service
// Consul will set empty Namespace to default, do the same here
if service.Namespace == "" {
service.Namespace = "default"
}
if c.services[service.Namespace] == nil {
c.services[service.Namespace] = make(map[string]*api.AgentServiceRegistration)
}
c.services[service.Namespace][service.ID] = service
return nil
}
func (c *MockAgent) ServiceDeregister(serviceID string) error {
// ServiceDeregisterOpts implements AgentAPI
func (c *MockAgent) ServiceDeregisterOpts(serviceID string, q *api.QueryOptions) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.services, serviceID)
for k, v := range c.checks {
namespace := getNamespace(q)
delete(c.services[namespace], serviceID)
for k, v := range c.checks[namespace] {
if v.ServiceID == serviceID {
delete(c.checks, k)
delete(c.checkTTLs, k)
delete(c.checks[namespace], k)
delete(c.checkTTLs[namespace], k)
}
}
return nil
}
func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
// UpdateTTLOpts implements AgentAPI
func (c *MockAgent) UpdateTTLOpts(id string, output string, status string, q *api.QueryOptions) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
check, ok := c.checks[id]
if !ok {
return fmt.Errorf("unknown check id: %q", id)
c.hits++
namespace := getNamespace(q)
checks, nsExists := c.checks[namespace]
if !nsExists {
return fmt.Errorf("unknown checks namespace: %q", namespace)
}
check, checkExists := checks[id]
if !checkExists {
return fmt.Errorf("unknown check: %s/%s", namespace, id)
}
// Flip initial status to passing
// todo(shoenig) why not just set to the given status?
check.Status = "passing"
c.checkTTLs[id]++
c.checkTTLs[namespace][id]++
return nil
}
// a convenience method for looking up a registered service by name
func (c *MockAgent) lookupService(name string) []*api.AgentServiceRegistration {
func (c *MockAgent) lookupService(namespace, name string) []*api.AgentServiceRegistration {
c.mu.Lock()
defer c.mu.Unlock()
var services []*api.AgentServiceRegistration
for _, service := range c.services {
for _, service := range c.services[namespace] {
if service.Name == name {
services = append(services, service)
}

View File

@@ -18,8 +18,7 @@ const (
// ChecksAPI is the part of the Consul API the checkWatcher requires.
type ChecksAPI interface {
// Checks returns a list of all checks.
Checks() (map[string]*api.AgentCheck, error)
ChecksWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentCheck, error)
}
// WorkloadRestarter allows the checkWatcher to restart tasks or entire task groups.
@@ -141,7 +140,8 @@ type checkWatchUpdate struct {
// checkWatcher watches Consul checks and restarts tasks when they're
// unhealthy.
type checkWatcher struct {
consul ChecksAPI
namespacesClient *NamespacesClient
checksAPI ChecksAPI
// pollFreq is how often to poll the checks API and defaults to
// defaultPollFreq
@@ -162,13 +162,14 @@ type checkWatcher struct {
}
// newCheckWatcher creates a new checkWatcher but does not call its Run method.
func newCheckWatcher(logger log.Logger, consul ChecksAPI) *checkWatcher {
func newCheckWatcher(logger log.Logger, checksAPI ChecksAPI, namespacesClient *NamespacesClient) *checkWatcher {
return &checkWatcher{
consul: consul,
pollFreq: defaultPollFreq,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
logger: logger.ResetNamed("consul.health"),
namespacesClient: namespacesClient,
checksAPI: checksAPI,
pollFreq: defaultPollFreq,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
logger: logger.ResetNamed("consul.health"),
}
}
@@ -196,6 +197,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
stopTimer()
// Main watch loop
WATCHER:
for {
// disable polling if there are no checks
if len(checks) == 0 {
@@ -230,13 +232,30 @@ func (w *checkWatcher) Run(ctx context.Context) {
// Set "now" as the point in time the following check results represent
now := time.Now()
results, err := w.consul.Checks()
// Get the list of all namespaces so we can iterate them.
namespaces, err := w.namespacesClient.List()
if err != nil {
if !w.lastErr {
w.lastErr = true
w.logger.Error("failed retrieving health checks", "error", err)
w.logger.Error("failed retrieving namespaces", "error", err)
}
continue WATCHER
}
checkResults := make(map[string]*api.AgentCheck)
for _, namespace := range namespaces {
nsResults, err := w.checksAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
if !w.lastErr {
w.lastErr = true
w.logger.Error("failed retrieving health checks", "error", err)
}
continue WATCHER
} else {
for k, v := range nsResults {
checkResults[k] = v
}
}
continue
}
w.lastErr = false
@@ -259,11 +278,11 @@ func (w *checkWatcher) Run(ctx context.Context) {
continue
}
result, ok := results[cid]
result, ok := checkResults[cid]
if !ok {
// Only warn if outside grace period to avoid races with check registration
if now.After(check.graceUntil) {
w.logger.Warn("watched check not found in Consul", "check", check.checkName, "check_id", cid)
// w.logger.Warn("watched check not found in Consul", "check", check.checkName, "check_id", cid) // add back
}
continue
}

View File

@@ -123,7 +123,7 @@ func (c *fakeChecksAPI) add(id, status string, at time.Time) {
c.mu.Unlock()
}
func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
func (c *fakeChecksAPI) ChecksWithFilterOpts(filter string, opts *api.QueryOptions) (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
@@ -149,10 +149,12 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
fakeAPI := newFakeChecksAPI()
cw := newCheckWatcher(testlog.HCLogger(t), fakeAPI)
logger := testlog.HCLogger(t)
checksAPI := newFakeChecksAPI()
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil))
cw := newCheckWatcher(logger, checksAPI, namespacesClient)
cw.pollFreq = 10 * time.Millisecond
return fakeAPI, cw
return checksAPI, cw
}
func testCheck() *structs.ServiceCheck {
@@ -176,7 +178,11 @@ func TestCheckWatcher_Skip(t *testing.T) {
check := testCheck()
check.CheckRestart = nil
cw := newCheckWatcher(testlog.HCLogger(t), newFakeChecksAPI())
logger := testlog.HCLogger(t)
checksAPI := newFakeChecksAPI()
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil))
cw := newCheckWatcher(logger, checksAPI, namespacesClient)
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)

View File

@@ -32,7 +32,8 @@ func TestConsul_Connect(t *testing.T) {
consulConfig.Address = testconsul.HTTPAddr
consulClient, err := consulapi.NewClient(consulConfig)
require.NoError(t, err)
serviceClient := NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
namespacesClient := NewNamespacesClient(consulClient.Namespaces())
serviceClient := NewServiceClient(consulClient.Agent(), namespacesClient, testlog.HCLogger(t), true)
// Lower periodicInterval to ensure periodic syncing doesn't improperly
// remove Connect services.

View File

@@ -135,7 +135,8 @@ func TestConsul_Integration(t *testing.T) {
consulClient, err := consulapi.NewClient(consulConfig)
r.Nil(err)
serviceClient := consul.NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
namespacesClient := consul.NewNamespacesClient(consulClient.Namespaces())
serviceClient := consul.NewServiceClient(consulClient.Agent(), namespacesClient, testlog.HCLogger(t), true)
defer serviceClient.Shutdown() // just-in-case cleanup
consulRan := make(chan struct{})
go func() {

View File

@@ -0,0 +1,42 @@
package consul
import (
"sort"
"strings"
)
// NamespacesClient is a wrapper for the Consul NamespacesAPI, that is used to
// deal with Consul OSS vs Consul Enterprise behavior in listing namespaces.
type NamespacesClient struct {
namespacesAPI NamespaceAPI
}
// NewNamespacesClient returns a NamespacesClient backed by a NamespaceAPI.
func NewNamespacesClient(namespacesAPI NamespaceAPI) *NamespacesClient {
return &NamespacesClient{
namespacesAPI: namespacesAPI,
}
}
// List returns a list of Consul Namespaces.
//
// If using Consul OSS, the list is a single element with the "default" namespace,
// even though the response from Consul OSS is an error.
func (ns *NamespacesClient) List() ([]string, error) {
namespaces, _, err := ns.namespacesAPI.List(nil)
if err != nil {
// check if the error was a 404, indicating Consul is the OSS version
// which does not have the /v1/namespace handler
if strings.Contains(err.Error(), "response code: 404") {
return []string{"default"}, nil
}
return nil, err
}
result := make([]string, 0, len(namespaces))
for _, namespace := range namespaces {
result = append(result, namespace.Name)
}
sort.Strings(result)
return result, nil
}

View File

@@ -12,8 +12,9 @@ import (
"sync/atomic"
"time"
metrics "github.com/armon/go-metrics"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/pkg/errors"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper"
@@ -89,20 +90,28 @@ type CatalogAPI interface {
Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error)
}
// NamespaceAPI is the consul/api.Namespace API used by Nomad.
//
// ACL requirements
// - operator:read OR namespace:*:read
type NamespaceAPI interface {
List(q *api.QueryOptions) ([]*api.Namespace, *api.QueryMeta, error)
}
// AgentAPI is the consul/api.Agent API used by Nomad.
//
// ACL requirements
// - agent:read
// - service:write
type AgentAPI interface {
Services() (map[string]*api.AgentService, error)
Checks() (map[string]*api.AgentCheck, error)
ServicesWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentService, error)
ChecksWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentCheck, error)
CheckRegister(check *api.AgentCheckRegistration) error
CheckDeregister(checkID string) error
CheckDeregisterOpts(checkID string, q *api.QueryOptions) error
Self() (map[string]map[string]interface{}, error)
ServiceRegister(service *api.AgentServiceRegistration) error
ServiceDeregister(serviceID string) error
UpdateTTL(id, output, status string) error
ServiceDeregisterOpts(serviceID string, q *api.QueryOptions) error
UpdateTTLOpts(id, output, status string, q *api.QueryOptions) error
}
// ConfigAPI is the consul/api.ConfigEntries API subset used by Nomad Server.
@@ -373,7 +382,9 @@ func (s *ServiceRegistration) copy() *ServiceRegistration {
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
client AgentAPI
agentAPI AgentAPI
namespacesClient *NamespacesClient
logger log.Logger
retryInterval time.Duration
maxRetryInterval time.Duration
@@ -402,8 +413,8 @@ type ServiceClient struct {
allocRegistrations map[string]*AllocRegistration
allocRegistrationsLock sync.RWMutex
// agent services and checks record entries for the agent itself which
// should be removed on shutdown
// Nomad agent services and checks that are recorded so they can be removed
// on shutdown. Defers to consul namespace specified in client consul config.
agentServices map[string]struct{}
agentChecks map[string]struct{}
agentLock sync.Mutex
@@ -429,10 +440,11 @@ type ServiceClient struct {
// Client, logger and takes whether the client is being used by a Nomad Client agent.
// When being used by a Nomad client, this Consul client reconciles all services and
// checks created by Nomad on behalf of running tasks.
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, logger log.Logger, isNomadClient bool) *ServiceClient {
logger = logger.ResetNamed("consul.sync")
return &ServiceClient{
client: consulClient,
agentAPI: agentAPI,
namespacesClient: namespacesClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
@@ -448,7 +460,7 @@ func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bo
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
checkWatcher: newCheckWatcher(logger, agentAPI, namespacesClient),
isClientAgent: isNomadClient,
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
}
@@ -492,7 +504,7 @@ func (c *ServiceClient) Run() {
// init will be closed when Consul has been contacted
init := make(chan struct{})
go checkConsulTLSSkipVerify(ctx, c.logger, c.client, init)
go checkConsulTLSSkipVerify(ctx, c.logger, c.agentAPI, init)
// Process operations while waiting for initial contact with Consul but
// do not sync until contact has been made.
@@ -604,8 +616,8 @@ func (c *ServiceClient) commit(ops *operations) {
}
func (c *ServiceClient) clearExplicitlyDeregistered() {
c.explicitlyDeregisteredServices = map[string]bool{}
c.explicitlyDeregisteredChecks = map[string]bool{}
c.explicitlyDeregisteredServices = make(map[string]bool)
c.explicitlyDeregisteredChecks = make(map[string]bool)
}
// merge registrations into state map prior to sync'ing with Consul
@@ -631,17 +643,34 @@ func (c *ServiceClient) merge(ops *operations) {
// sync enqueued operations.
func (c *ServiceClient) sync(reason syncReason) error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
var err error
consulServices, err := c.client.Services()
// Get the list of all namespaces created so we can iterate them.
namespaces, err := c.namespacesClient.List()
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return fmt.Errorf("error querying Consul services: %v", err)
return errors.Wrap(err, "failed to query Consul namespaces")
}
// Accumulate all services in Consul across all namespaces.
servicesInConsul := make(map[string]*api.AgentService)
for _, namespace := range namespaces {
if nsServices, err := c.agentAPI.ServicesWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)}); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return errors.Wrap(err, "failed to query Consul services")
} else {
for k, v := range nsServices {
servicesInConsul[k] = v
}
}
}
// Compute whether we are still in probation period where we will avoid
// de-registering services.
inProbation := time.Now().Before(c.deregisterProbationExpiry)
// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
// Remove Nomad services in Consul but unknown to Nomad.
for id := range servicesInConsul {
if _, ok := c.services[id]; ok {
// Known service, skip
continue
@@ -667,7 +696,8 @@ func (c *ServiceClient) sync(reason syncReason) error {
}
// Unknown Nomad managed service; kill
if err := c.client.ServiceDeregister(id); err != nil {
ns := servicesInConsul[id].Namespace
if err := c.agentAPI.ServiceDeregisterOpts(id, &api.QueryOptions{Namespace: ns}); err != nil {
if isOldNomadService(id) {
// Don't hard-fail on old entries. See #3620
continue
@@ -683,11 +713,11 @@ func (c *ServiceClient) sync(reason syncReason) error {
// Add Nomad services missing from Consul, or where the service has been updated.
for id, serviceInNomad := range c.services {
serviceInConsul, exists := consulServices[id]
sidecarInConsul := getNomadSidecar(id, consulServices)
serviceInConsul, exists := servicesInConsul[id]
sidecarInConsul := getNomadSidecar(id, servicesInConsul)
if !exists || agentServiceUpdateRequired(reason, serviceInNomad, serviceInConsul, sidecarInConsul) {
if err = c.client.ServiceRegister(serviceInNomad); err != nil {
if err = c.agentAPI.ServiceRegister(serviceInNomad); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
@@ -697,14 +727,20 @@ func (c *ServiceClient) sync(reason syncReason) error {
}
consulChecks, err := c.client.Checks()
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return fmt.Errorf("error querying Consul checks: %v", err)
checksInConsul := make(map[string]*api.AgentCheck)
for _, namespace := range namespaces {
nsChecks, err := c.agentAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return errors.Wrap(err, "failed to query Consul checks")
}
for k, v := range nsChecks {
checksInConsul[k] = v
}
}
// Remove Nomad checks in Consul but unknown locally
for id, check := range consulChecks {
for id, check := range checksInConsul {
if _, ok := c.checks[id]; ok {
// Known check, leave it
continue
@@ -730,7 +766,7 @@ func (c *ServiceClient) sync(reason syncReason) error {
}
// Unknown Nomad managed check; remove
if err := c.client.CheckDeregister(id); err != nil {
if err := c.agentAPI.CheckDeregisterOpts(id, &api.QueryOptions{Namespace: check.Namespace}); err != nil {
if isOldNomadService(check.ServiceID) {
// Don't hard-fail on old entries.
continue
@@ -745,12 +781,11 @@ func (c *ServiceClient) sync(reason syncReason) error {
// Add Nomad checks missing from Consul
for id, check := range c.checks {
if _, ok := consulChecks[id]; ok {
if _, ok := checksInConsul[id]; ok {
// Already in Consul; skipping
continue
}
if err := c.client.CheckRegister(check); err != nil {
if err := c.agentAPI.CheckRegister(check); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
@@ -820,7 +855,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
}
checkHost, checkPort = host, port
}
checkReg, err := createCheckReg(id, checkID, check, checkHost, checkPort)
checkReg, err := createCheckReg(id, checkID, check, checkHost, checkPort, "") // todo ... whats up with agent namespace and its checks?
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
@@ -934,6 +969,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
Kind: kind,
ID: id,
Name: service.Name,
Namespace: workload.ConsulNamespace,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
@@ -986,12 +1022,11 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
}
checkID := MakeCheckID(serviceID, check)
registration, err := createCheckReg(serviceID, checkID, check, ip, port)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ConsulNamespace)
if err != nil {
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
sreg.CheckOnUpdate[checkID] = check.OnUpdate
registrations = append(registrations, registration)
}
@@ -1193,8 +1228,18 @@ func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) {
c.commit(&ops)
}
// normalizeNamespace will turn the "default" namespace into the empty string,
// so that Consul OSS will not produce an error setting something in the default
// namespace.
func normalizeNamespace(namespace string) string {
if namespace == "default" {
return ""
}
return namespace
}
// AllocRegistrations returns the registrations for the given allocation. If the
// allocation has no reservations, the response is a nil object.
// allocation has no registrations, the response is a nil object.
func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) {
// Get the internal struct using the lock
c.allocRegistrationsLock.RLock()
@@ -1208,15 +1253,32 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
reg := regInternal.copy()
c.allocRegistrationsLock.RUnlock()
// Query the services and checks to populate the allocation registrations.
services, err := c.client.Services()
// Get the list of all namespaces created so we can iterate them.
namespaces, err := c.namespacesClient.List()
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to retrieve namespaces from consul")
}
checks, err := c.client.Checks()
if err != nil {
return nil, err
services := make(map[string]*api.AgentService)
checks := make(map[string]*api.AgentCheck)
// Query the services and checks to populate the allocation registrations.
for _, namespace := range namespaces {
nsServices, err := c.agentAPI.ServicesWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve services from consul")
}
for k, v := range nsServices {
services[k] = v
}
nsChecks, err := c.agentAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve checks from consul")
}
for k, v := range nsChecks {
checks[k] = v
}
}
// Populate the object
@@ -1236,8 +1298,8 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
// UpdateTTL is used to update the TTL of a check. Typically this will only be
// called to heartbeat script checks.
func (c *ServiceClient) UpdateTTL(id, output, status string) error {
return c.client.UpdateTTL(id, output, status)
func (c *ServiceClient) UpdateTTL(id, namespace, output, status string) error {
return c.agentAPI.UpdateTTLOpts(id, output, status, &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
}
// Shutdown the Consul client. Update running task registrations and deregister
@@ -1273,14 +1335,25 @@ func (c *ServiceClient) Shutdown() error {
// Always attempt to deregister Nomad agent Consul entries, even if
// deadline was reached
for id := range c.agentServices {
if err := c.client.ServiceDeregister(id); err != nil {
if err := c.agentAPI.ServiceDeregisterOpts(id, nil); err != nil {
c.logger.Error("failed deregistering agent service", "service_id", id, "error", err)
}
}
remainingChecks, err := c.client.Checks()
namespaces, err := c.namespacesClient.List()
if err != nil {
c.logger.Error("failed listing remaining checks after deregistering services", "error", err)
c.logger.Error("failed to retrieve namespaces from consul", "error", err)
}
remainingChecks := make(map[string]*api.AgentCheck)
for _, namespace := range namespaces {
nsChecks, err := c.agentAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
c.logger.Error("failed to retrieve checks from consul", "error", err)
}
for k, v := range nsChecks {
remainingChecks[k] = v
}
}
checkRemains := func(id string) bool {
@@ -1296,7 +1369,8 @@ func (c *ServiceClient) Shutdown() error {
// if we couldn't populate remainingChecks it is unlikely that CheckDeregister will work, but try anyway
// if we could list the remaining checks, verify that the check we store still exists before removing it.
if remainingChecks == nil || checkRemains(id) {
if err := c.client.CheckDeregister(id); err != nil {
ns := remainingChecks[id].Namespace
if err := c.agentAPI.CheckDeregisterOpts(id, &api.QueryOptions{Namespace: ns}); err != nil {
c.logger.Error("failed deregistering agent check", "check_id", id, "error", err)
}
}
@@ -1370,11 +1444,12 @@ func MakeCheckID(serviceID string, check *structs.ServiceCheck) string {
//
// Script checks simply have a TTL set and the caller is responsible for
// running the script and heart-beating.
func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host string, port int) (*api.AgentCheckRegistration, error) {
func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host string, port int, namespace string) (*api.AgentCheckRegistration, error) {
chkReg := api.AgentCheckRegistration{
ID: checkID,
Name: check.Name,
ServiceID: serviceID,
Namespace: normalizeNamespace(namespace),
}
chkReg.Status = check.InitialStatus
chkReg.Timeout = check.Timeout.String()

View File

@@ -336,9 +336,10 @@ func TestSyncLogic_maybeTweakTags_emptySC(t *testing.T) {
func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
t.Parallel()
mock := NewMockAgent()
mockAgent := NewMockAgent()
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil))
logger := testlog.HCLogger(t)
sc := NewServiceClient(mock, logger, true)
sc := NewServiceClient(mockAgent, namespacesClient, logger, true)
allocID := uuid.Generate()
ws := &WorkloadServices{

View File

@@ -9,18 +9,21 @@ import (
)
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with Consul
// that need to be syncronized with Consul.
type WorkloadServices struct {
AllocID string
// Name of the task and task group the services are defined for. For
// group based services, Task will be empty
// group based services, Task will be empty.
Task string
Group string
// Canary indicates whether or not the allocation is a canary
// Canary indicates whether or not the allocation is a canary.
Canary bool
// ConsulNamespace is the consul namespace in which services will be registered.
ConsulNamespace string
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
Restarter WorkloadRestarter
@@ -32,16 +35,16 @@ type WorkloadServices struct {
// TODO: remove and use Ports
Networks structs.Networks
// NetworkStatus from alloc if network namespace is created
// Can be nil
// NetworkStatus from alloc if network namespace is created.
// Can be nil.
NetworkStatus *structs.AllocNetworkStatus
// AllocatedPorts is the list of port mappings
// AllocatedPorts is the list of port mappings.
Ports structs.AllocatedPorts
// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook
// a tasklet in the taskrunner script_check_hook.
DriverExec interfaces.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.

View File

@@ -106,10 +106,11 @@ func (t *testFakeCtx) syncOnce(reason syncReason) error {
// A test Workload is also provided.
func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
nsc := NewNamespacesClient(NewMockNamespaces(nil))
tw := testWorkload()
// by default start fake client being out of probation
sc := NewServiceClient(fc, testlog.HCLogger(t), true)
sc := NewServiceClient(fc, nsc, testlog.HCLogger(t), true)
sc.deregisterProbationExpiry = time.Now().Add(-1 * time.Minute)
return &testFakeCtx{
@@ -135,7 +136,7 @@ func TestConsul_ChangeTags(t *testing.T) {
r.Equal(1, reg1.NumServices())
r.Equal(0, reg1.NumChecks())
serviceBefore := ctx.FakeConsul.lookupService("taskname-service")[0]
serviceBefore := ctx.FakeConsul.lookupService("default", "taskname-service")[0]
r.Equal(serviceBefore.Name, ctx.Workload.Services[0].Name)
r.Equal(serviceBefore.Tags, ctx.Workload.Services[0].Tags)
@@ -149,7 +150,7 @@ func TestConsul_ChangeTags(t *testing.T) {
r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
// Validate the consul service definition changed
serviceAfter := ctx.FakeConsul.lookupService("taskname-service")[0]
serviceAfter := ctx.FakeConsul.lookupService("default", "taskname-service")[0]
r.Equal(serviceAfter.Name, ctx.Workload.Services[0].Name)
r.Equal(serviceAfter.Tags, ctx.Workload.Services[0].Tags)
r.Equal("new-tag", serviceAfter.Tags[0])
@@ -177,25 +178,25 @@ func TestConsul_EnableTagOverride_Syncs(t *testing.T) {
const service = "taskname-service"
// check things are what we expect
consulServiceDefBefore := ctx.FakeConsul.lookupService(service)[0]
consulServiceDefBefore := ctx.FakeConsul.lookupService("default", service)[0]
r.Equal(ctx.Workload.Services[0].Name, consulServiceDefBefore.Name)
r.Equal([]string{"tag1", "tag2"}, consulServiceDefBefore.Tags)
r.True(consulServiceDefBefore.EnableTagOverride)
// manually set the tags in consul
ctx.FakeConsul.lookupService(service)[0].Tags = []string{"new", "tags"}
ctx.FakeConsul.lookupService("default", service)[0].Tags = []string{"new", "tags"}
// do a periodic sync (which will respect EnableTagOverride)
r.NoError(ctx.syncOnce(syncPeriodic))
r.Equal(1, len(ctx.FakeConsul.services))
consulServiceDefAfter := ctx.FakeConsul.lookupService(service)[0]
consulServiceDefAfter := ctx.FakeConsul.lookupService("default", service)[0]
r.Equal([]string{"new", "tags"}, consulServiceDefAfter.Tags) // manually set tags should still be there
// now do a new-ops sync (which will override EnableTagOverride)
r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
r.NoError(ctx.syncOnce(syncNewOps))
r.Equal(1, len(ctx.FakeConsul.services))
consulServiceDefUpdated := ctx.FakeConsul.lookupService(service)[0]
consulServiceDefUpdated := ctx.FakeConsul.lookupService("default", service)[0]
r.Equal([]string{"tag1", "tag2"}, consulServiceDefUpdated.Tags) // jobspec tags should be set now
}
@@ -233,20 +234,20 @@ func TestConsul_ChangePorts(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
require.Equal(1, len(ctx.FakeConsul.services["default"]), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
require.Equal(ctx.Workload.Services[0].Name, v.Name)
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
require.Equal(xPort, v.Port)
}
require.Len(ctx.FakeConsul.checks, 3)
require.Len(ctx.FakeConsul.checks["default"], 3)
origTCPKey := ""
origScriptKey := ""
origHTTPKey := ""
for k, v := range ctx.FakeConsul.checks {
for k, v := range ctx.FakeConsul.checks["default"] {
switch v.Name {
case "c1":
origTCPKey = k
@@ -295,17 +296,17 @@ func TestConsul_ChangePorts(t *testing.T) {
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
require.Equal(1, len(ctx.FakeConsul.services["default"]), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
require.Equal(ctx.Workload.Services[0].Name, v.Name)
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
require.Equal(yPort, v.Port)
}
require.Equal(3, len(ctx.FakeConsul.checks))
require.Equal(3, len(ctx.FakeConsul.checks["default"]))
for k, v := range ctx.FakeConsul.checks {
for k, v := range ctx.FakeConsul.checks["default"] {
switch v.Name {
case "c1":
// C1 is changed because the service was re-registered
@@ -348,8 +349,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
// Assert a check restart watch update was enqueued and clear it
@@ -376,7 +377,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
}
origServiceKey := ""
for k, v := range ctx.FakeConsul.services {
for k, v := range ctx.FakeConsul.services["default"] {
origServiceKey = k
if v.Name != ctx.Workload.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
@@ -386,10 +387,10 @@ func TestConsul_ChangeChecks(t *testing.T) {
}
}
if n := len(ctx.FakeConsul.checks); n != 1 {
t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
if n := len(ctx.FakeConsul.checks["default"]); n != 1 {
t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"])
}
for _, v := range ctx.FakeConsul.checks {
for _, v := range ctx.FakeConsul.checks["default"] {
if v.Name != "c1" {
t.Fatalf("expected check c1 but found %q", v.Name)
}
@@ -444,19 +445,19 @@ func TestConsul_ChangeChecks(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
if _, ok := ctx.FakeConsul.services[origServiceKey]; !ok {
if _, ok := ctx.FakeConsul.services["default"][origServiceKey]; !ok {
t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services)
}
if n := len(ctx.FakeConsul.checks); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
if n := len(ctx.FakeConsul.checks["default"]); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"])
}
for k, v := range ctx.FakeConsul.checks {
for k, v := range ctx.FakeConsul.checks["default"] {
switch v.Name {
case "c1":
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
@@ -538,11 +539,11 @@ func TestConsul_ChangeChecks(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.checks); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
if n := len(ctx.FakeConsul.checks["default"]); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"])
}
for k, v := range ctx.FakeConsul.checks {
for k, v := range ctx.FakeConsul.checks["default"] {
if v.Name == "c1" {
if k != c1ID {
t.Errorf("expected c1 to still have id %q but found %q", c1ID, k)
@@ -582,11 +583,11 @@ func TestConsul_RegServices(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name != ctx.Workload.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
}
@@ -632,10 +633,10 @@ func TestConsul_RegServices(t *testing.T) {
// Make sure changes don't take affect until sync() is called (since
// Run() isn't running)
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
if reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags to differ, changes applied before sync()")
}
@@ -645,11 +646,11 @@ func TestConsul_RegServices(t *testing.T) {
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 2 {
t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services)
if n := len(ctx.FakeConsul.services["default"]); n != 2 {
t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
found := false
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name == ctx.Workload.Services[0].Name {
if found {
t.Fatalf("found new service name %q twice", v.Name)
@@ -669,10 +670,10 @@ func TestConsul_RegServices(t *testing.T) {
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name != "taskname-service" {
t.Errorf("expected original task to survive not %q", v.Name)
}
@@ -721,15 +722,14 @@ func TestConsul_ShutdownOK(t *testing.T) {
require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond)
// assert successful registration
require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered")
require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered")
require.Contains(ctx.FakeConsul.services,
makeAgentServiceID("client", agentServices[0]))
require.Len(ctx.FakeConsul.services["default"], 1, "expected agent service to be registered")
require.Len(ctx.FakeConsul.checks["default"], 1, "expected agent check to be registered")
require.Contains(ctx.FakeConsul.services["default"], makeAgentServiceID("client", agentServices[0]))
// Shutdown() should block until Nomad agent service/check is deregistered
require.NoError(ctx.ServiceClient.Shutdown())
require.Len(ctx.FakeConsul.services, 0, "expected agent service to be deregistered")
require.Len(ctx.FakeConsul.checks, 0, "expected agent check to be deregistered")
require.Len(ctx.FakeConsul.services["default"], 0, "expected agent service to be deregistered")
require.Len(ctx.FakeConsul.checks["default"], 0, "expected agent check to be deregistered")
}
// TestConsul_ShutdownBlocked tests the blocked past deadline path for the
@@ -763,8 +763,8 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
}
require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices))
require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond)
require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered")
require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered")
require.Len(ctx.FakeConsul.services["default"], 1, "expected agent service to be registered")
require.Len(ctx.FakeConsul.checks["default"], 1, "expected agent check to be registered")
// prevent normal shutdown by blocking Consul. the shutdown should wait
// until agent deregistration has finished
@@ -792,9 +792,9 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
"expected shutdown to take >200ms and <1s")
require.Greater(shutdownTime, 200*time.Millisecond.Seconds(),
"expected shutdown to take >200ms and <1s")
require.Len(ctx.FakeConsul.services, 0,
require.Len(ctx.FakeConsul.services["default"], 0,
"expected agent service to be deregistered")
require.Len(ctx.FakeConsul.checks, 0,
require.Len(ctx.FakeConsul.checks["default"], 0,
"expected agent check to be deregistered")
}
@@ -863,11 +863,11 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 3 {
if n := len(ctx.FakeConsul.services["default"]); n != 3 {
t.Fatalf("expected 2 services but found: %d", n)
}
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
switch v.Name {
case ctx.Workload.Services[0].Name: // x
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
@@ -967,11 +967,11 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 3 {
if n := len(ctx.FakeConsul.services["default"]); n != 3 {
t.Fatalf("expected 3 services but found: %d", n)
}
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
switch v.Name {
case ctx.Workload.Services[0].Name: // x + auto
// Since DriverNetwork.AutoAdvertise=false, host ports should be used
@@ -1028,11 +1028,11 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found: %d", n)
}
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
switch v.Name {
case ctx.Workload.Services[0].Name:
if v.Port != port {
@@ -1085,8 +1085,8 @@ func TestConsul_CanaryTags(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.Equal(canaryTags, service.Tags)
}
@@ -1095,14 +1095,14 @@ func TestConsul_CanaryTags(t *testing.T) {
ctx.Workload.Canary = false
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.NotEqual(canaryTags, service.Tags)
}
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 0)
require.Len(ctx.FakeConsul.services["default"], 0)
}
// TestConsul_CanaryTags_NoTags asserts Tags are used when Canary=true and there
@@ -1118,8 +1118,8 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.Equal(tags, service.Tags)
}
@@ -1128,14 +1128,14 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
ctx.Workload.Canary = false
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.Equal(tags, service.Tags)
}
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 0)
require.Len(ctx.FakeConsul.services["default"], 0)
}
// TestConsul_CanaryMeta asserts CanaryMeta are used when Canary=true
@@ -1151,8 +1151,8 @@ func TestConsul_CanaryMeta(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.Equal(canaryMeta, service.Meta)
}
@@ -1161,14 +1161,14 @@ func TestConsul_CanaryMeta(t *testing.T) {
ctx.Workload.Canary = false
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.NotEqual(canaryMeta, service.Meta)
}
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 0)
require.Len(ctx.FakeConsul.services["default"], 0)
}
// TestConsul_CanaryMeta_NoMeta asserts Meta are used when Canary=true and there
@@ -1185,8 +1185,8 @@ func TestConsul_CanaryMeta_NoMeta(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.Equal(meta, service.Meta)
}
@@ -1195,14 +1195,14 @@ func TestConsul_CanaryMeta_NoMeta(t *testing.T) {
ctx.Workload.Canary = false
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Len(ctx.FakeConsul.services["default"], 1)
for _, service := range ctx.FakeConsul.services["default"] {
require.Equal(meta, service.Meta)
}
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 0)
require.Len(ctx.FakeConsul.services["default"], 0)
}
// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with
@@ -1284,8 +1284,10 @@ func TestCreateCheckReg_HTTP(t *testing.T) {
checkID := check.Hash(serviceID)
host := "localhost"
port := 41111
namespace := ""
expected := &api.AgentCheckRegistration{
Namespace: namespace,
ID: checkID,
Name: "name",
ServiceID: serviceID,
@@ -1300,7 +1302,7 @@ func TestCreateCheckReg_HTTP(t *testing.T) {
},
}
actual, err := createCheckReg(serviceID, checkID, check, host, port)
actual, err := createCheckReg(serviceID, checkID, check, host, port, namespace)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -1329,6 +1331,7 @@ func TestCreateCheckReg_GRPC(t *testing.T) {
checkID := check.Hash(serviceID)
expected := &api.AgentCheckRegistration{
Namespace: "",
ID: checkID,
Name: "name",
ServiceID: serviceID,
@@ -1341,7 +1344,7 @@ func TestCreateCheckReg_GRPC(t *testing.T) {
},
}
actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080)
actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080, "default")
require.NoError(t, err)
require.Equal(t, expected, actual)
}
@@ -1747,9 +1750,9 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 3)
require.Len(ctx.FakeConsul.services["default"], 3)
for _, v := range ctx.FakeConsul.services {
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name == ctx.Workload.Services[0].Name && v.Port == xPort {
require.ElementsMatch(v.Tags, ctx.Workload.Services[0].Tags)
require.Len(v.Checks, 1)
@@ -1815,8 +1818,8 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
require.Len(ctx.FakeConsul.services["default"], 2)
require.Len(ctx.FakeConsul.checks["default"], 2)
// we register a task through nomad API then remove it out of band
outofbandWorkload := testWorkload()
@@ -1840,7 +1843,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 3)
require.Len(ctx.FakeConsul.services["default"], 3)
// remove outofbandWorkload from local services so it appears unknown to client
require.Len(ctx.ServiceClient.services, 3)
@@ -1857,16 +1860,16 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
require.NoError(ctx.syncOnce(syncNewOps))
require.NoError(ctx.ServiceClient.sync(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
require.Len(ctx.FakeConsul.services["default"], 1)
require.Len(ctx.FakeConsul.checks["default"], 1)
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.services["default"], remainingWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services["default"], outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services["default"], explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks["default"], MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks["default"], MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks["default"], MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
}
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
@@ -1924,8 +1927,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
require.Len(ctx.FakeConsul.services["default"], 2)
require.Len(ctx.FakeConsul.checks["default"], 2)
// we register a task through nomad API then remove it out of band
outofbandWorkload := testWorkload()
@@ -1949,7 +1952,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Len(ctx.FakeConsul.services, 3)
require.Len(ctx.FakeConsul.services["default"], 3)
// remove outofbandWorkload from local services so it appears unknown to client
require.Len(ctx.ServiceClient.services, 3)
@@ -1966,30 +1969,29 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
require.NoError(ctx.syncOnce(syncNewOps))
require.NoError(ctx.ServiceClient.sync(syncNewOps))
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
require.Len(ctx.FakeConsul.services["default"], 2)
require.Len(ctx.FakeConsul.checks["default"], 2)
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
require.Contains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.services["default"], remainingWorkloadServiceID)
require.Contains(ctx.FakeConsul.services["default"], outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services["default"], explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks["default"], MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks["default"], MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks["default"], MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
// after probation, outofband services and checks are removed
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
require.NoError(ctx.ServiceClient.sync(syncNewOps))
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
require.Len(ctx.FakeConsul.services["default"], 1)
require.Len(ctx.FakeConsul.checks["default"], 1)
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.services["default"], remainingWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services["default"], outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services["default"], explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks["default"], MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks["default"], MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks["default"], MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
}

View File

@@ -7,7 +7,6 @@ import (
"strings"
"github.com/golang/snappy"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/jobspec"
@@ -877,6 +876,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.Affinities = ApiAffinitiesToStructs(taskGroup.Affinities)
tg.Networks = ApiNetworkResourceToStructs(taskGroup.Networks)
tg.Services = ApiServicesToStructs(taskGroup.Services)
tg.Consul = apiConsulToStructs(taskGroup.Consul)
tg.RestartPolicy = &structs.RestartPolicy{
Attempts: *taskGroup.RestartPolicy.Attempts,
@@ -1575,6 +1575,15 @@ func apiConnectSidecarTaskToStructs(in *api.SidecarTask) *structs.SidecarTask {
}
}
func apiConsulToStructs(in *api.Consul) *structs.Consul {
if in == nil {
return nil
}
return &structs.Consul{
Namespace: in.Namespace,
}
}
func apiLogConfigToStructs(in *api.LogConfig) *structs.LogConfig {
if in == nil {
return nil

View File

@@ -1928,6 +1928,9 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"key": "value",
},
Consul: &api.Consul{
Namespace: "team-foo",
},
Services: []*api.Service{
{
Name: "groupserviceA",
@@ -2304,6 +2307,9 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"key": "value",
},
Consul: &structs.Consul{
Namespace: "team-foo",
},
Services: []*structs.Service{
{
Name: "groupserviceA",
@@ -2581,6 +2587,9 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"key": "value",
},
Consul: &api.Consul{
Namespace: "foo",
},
Tasks: []*api.Task{
{
Name: "task1",
@@ -2698,6 +2707,9 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"key": "value",
},
Consul: &structs.Consul{
Namespace: "foo",
},
Tasks: []*structs.Task{
{
Name: "task1",

View File

@@ -87,7 +87,7 @@ func commandAssetsConnectShortNomad() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "command/assets/connect-short.nomad", size: 997, mode: os.FileMode(436), modTime: time.Unix(1612560436, 0)}
info := bindataFileInfo{name: "command/assets/connect-short.nomad", size: 997, mode: os.FileMode(436), modTime: time.Unix(1616684356, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
@@ -107,7 +107,7 @@ func commandAssetsConnectNomad() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "command/assets/connect.nomad", size: 17842, mode: os.FileMode(436), modTime: time.Unix(1612560436, 0)}
info := bindataFileInfo{name: "command/assets/connect.nomad", size: 17842, mode: os.FileMode(436), modTime: time.Unix(1616684356, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
@@ -127,7 +127,7 @@ func commandAssetsExampleShortNomad() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "command/assets/example-short.nomad", size: 324, mode: os.FileMode(436), modTime: time.Unix(1612560436, 0)}
info := bindataFileInfo{name: "command/assets/example-short.nomad", size: 324, mode: os.FileMode(436), modTime: time.Unix(1616684356, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
@@ -147,7 +147,7 @@ func commandAssetsExampleNomad() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "command/assets/example.nomad", size: 16057, mode: os.FileMode(436), modTime: time.Unix(1612560436, 0)}
info := bindataFileInfo{name: "command/assets/example.nomad", size: 16057, mode: os.FileMode(436), modTime: time.Unix(1616684356, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}

View File

@@ -157,7 +157,7 @@ func (c *JobRunCommand) Name() string { return "job run" }
func (c *JobRunCommand) Run(args []string) int {
var detach, verbose, output, override, preserveCounts bool
var checkIndexStr, consulToken, vaultToken, vaultNamespace string
var checkIndexStr, consulToken, consulNamespace, vaultToken, vaultNamespace string
var varArgs, varFiles flaghelper.StringFlag
flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient)
@@ -170,6 +170,7 @@ func (c *JobRunCommand) Run(args []string) int {
flagSet.BoolVar(&c.JobGetter.hcl1, "hcl1", false, "")
flagSet.StringVar(&checkIndexStr, "check-index", "", "")
flagSet.StringVar(&consulToken, "consul-token", "", "")
flagSet.StringVar(&consulNamespace, "consul-namespace", "", "")
flagSet.StringVar(&vaultToken, "vault-token", "", "")
flagSet.StringVar(&vaultNamespace, "vault-namespace", "", "")
flagSet.Var(&varArgs, "var", "")
@@ -232,6 +233,10 @@ func (c *JobRunCommand) Run(args []string) int {
job.ConsulToken = helper.StringToPtr(consulToken)
}
if consulNamespace != "" {
job.ConsulNamespace = helper.StringToPtr(consulNamespace)
}
// Parse the Vault token
if vaultToken == "" {
// Check the environment variable