mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Hash fields used in task service IDs
Fixes #3620 Previously we concatenated tags into task service IDs. This could break deregistration of tag names that contained double //s like some Fabio tags. This change breaks service ID backward compatibility so on upgrade all users services and checks will be removed and re-added with new IDs. This change has the side effect of including all service fields in the ID's hash, so we no longer have to track PortLabel and AddressMode changes independently.
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
## 0.7.1 (Unreleased)
|
||||
|
||||
__BACKWARDS INCOMPATIBILITIES:__
|
||||
* client: The format of service IDs in Consul has changed. If you rely upon
|
||||
Nomad's service IDs (*not* service names; those are stable), you will need
|
||||
to update your code. [GH-3632]
|
||||
* config: Nomad no longer parses Atlas configuration stanzas. Atlas has been
|
||||
deprecated since earlier this year. If you have an Atlas stanza in your
|
||||
config file it will have to be removed.
|
||||
@@ -57,6 +60,8 @@ BUG FIXES:
|
||||
explicitly [GH-3520]
|
||||
* cli: Fix passing Consul address via flags [GH-3504]
|
||||
* cli: Fix panic when running `keyring` commands [GH-3509]
|
||||
* client: Fix advertising services with tags that require URL escaping
|
||||
[GH-3632]
|
||||
* client: Fix a panic when restoring an allocation with a dead leader task
|
||||
[GH-3502]
|
||||
* client: Fix crash when following logs from a Windows node [GH-3608]
|
||||
|
||||
@@ -2,7 +2,10 @@ package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/base32"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
@@ -21,10 +24,14 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// nomadServicePrefix is the first prefix that scopes all Nomad registered
|
||||
// services
|
||||
// nomadServicePrefix is the prefix that scopes all Nomad registered
|
||||
// services (both agent and task entries).
|
||||
nomadServicePrefix = "_nomad"
|
||||
|
||||
// nomadTaskPrefix is the prefix that scopes Nomad registered services
|
||||
// for tasks.
|
||||
nomadTaskPrefix = nomadServicePrefix + "-task-"
|
||||
|
||||
// defaultRetryInterval is how quickly to retry syncing services and
|
||||
// checks to Consul when an error occurs. Will backoff up to a max.
|
||||
defaultRetryInterval = time.Second
|
||||
@@ -288,8 +295,13 @@ func (c *ServiceClient) Run() {
|
||||
|
||||
if err := c.sync(); err != nil {
|
||||
if failures == 0 {
|
||||
// Log on the first failure
|
||||
c.logger.Printf("[WARN] consul.sync: failed to update services in Consul: %v", err)
|
||||
} else if failures%10 == 0 {
|
||||
// Log every 10th consecutive failure
|
||||
c.logger.Printf("[ERR] consul.sync: still unable to update services in Consul after %d failures; latest error: %v", failures, err)
|
||||
}
|
||||
|
||||
failures++
|
||||
if !retryTimer.Stop() {
|
||||
// Timer already expired, since the timer may
|
||||
@@ -389,8 +401,14 @@ func (c *ServiceClient) sync() error {
|
||||
// Not managed by Nomad, skip
|
||||
continue
|
||||
}
|
||||
|
||||
// Unknown Nomad managed service; kill
|
||||
if err := c.client.ServiceDeregister(id); err != nil {
|
||||
if isOldNomadService(id) {
|
||||
// Don't hard-fail on old entries. See #3620
|
||||
continue
|
||||
}
|
||||
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
@@ -398,29 +416,16 @@ func (c *ServiceClient) sync() error {
|
||||
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
|
||||
}
|
||||
|
||||
// Track services whose ports have changed as their checks may also
|
||||
// need updating
|
||||
portsChanged := make(map[string]struct{}, len(c.services))
|
||||
|
||||
// Add Nomad services missing from Consul
|
||||
for id, locals := range c.services {
|
||||
if remotes, ok := consulServices[id]; ok {
|
||||
// Make sure Port and Address are stable since
|
||||
// PortLabel and AddressMode aren't included in the
|
||||
// service ID.
|
||||
if locals.Port == remotes.Port && locals.Address == remotes.Address {
|
||||
// Already exists in Consul; skip
|
||||
continue
|
||||
if _, ok := consulServices[id]; !ok {
|
||||
if err = c.client.ServiceRegister(locals); err != nil {
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
// Port changed, reregister it and its checks
|
||||
portsChanged[id] = struct{}{}
|
||||
sreg++
|
||||
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
|
||||
}
|
||||
if err = c.client.ServiceRegister(locals); err != nil {
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
sreg++
|
||||
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
|
||||
}
|
||||
|
||||
// Remove Nomad checks in Consul but unknown locally
|
||||
@@ -433,8 +438,14 @@ func (c *ServiceClient) sync() error {
|
||||
// Service not managed by Nomad, skip
|
||||
continue
|
||||
}
|
||||
// Unknown Nomad managed check; kill
|
||||
|
||||
// Unknown Nomad managed check; remove
|
||||
if err := c.client.CheckDeregister(id); err != nil {
|
||||
if isOldNomadService(check.ServiceID) {
|
||||
// Don't hard-fail on old entries.
|
||||
continue
|
||||
}
|
||||
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
@@ -444,12 +455,11 @@ func (c *ServiceClient) sync() error {
|
||||
|
||||
// Add Nomad checks missing from Consul
|
||||
for id, check := range c.checks {
|
||||
if check, ok := consulChecks[id]; ok {
|
||||
if _, changed := portsChanged[check.ServiceID]; !changed {
|
||||
// Already in Consul and ports didn't change; skipping
|
||||
continue
|
||||
}
|
||||
if _, ok := consulChecks[id]; ok {
|
||||
// Already in Consul; skipping
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.client.CheckRegister(check); err != nil {
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
@@ -751,6 +761,9 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
|
||||
continue
|
||||
}
|
||||
|
||||
// Service exists and hasn't changed, don't re-add it later
|
||||
delete(newIDs, existingID)
|
||||
|
||||
// Service still exists so add it to the task's registration
|
||||
sreg := &ServiceRegistration{
|
||||
serviceID: existingID,
|
||||
@@ -758,15 +771,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
|
||||
}
|
||||
taskReg.Services[existingID] = sreg
|
||||
|
||||
// PortLabel and AddressMode aren't included in the ID, so we
|
||||
// have to compare manually.
|
||||
serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode
|
||||
if serviceUnchanged {
|
||||
// Service exists and hasn't changed, don't add it later
|
||||
delete(newIDs, existingID)
|
||||
}
|
||||
|
||||
// See what checks were updated
|
||||
// See if any checks were updated
|
||||
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
|
||||
for _, check := range existingSvc.Checks {
|
||||
existingChecks[makeCheckID(existingID, check)] = check
|
||||
@@ -779,17 +784,16 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
|
||||
// Check exists, so don't remove it
|
||||
delete(existingChecks, checkID)
|
||||
sreg.checkIDs[checkID] = struct{}{}
|
||||
} else if serviceUnchanged {
|
||||
// New check on an unchanged service; add them now
|
||||
newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, checkID := range newCheckIDs {
|
||||
sreg.checkIDs[checkID] = struct{}{}
|
||||
// New check on an unchanged service; add them now
|
||||
newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
for _, checkID := range newCheckIDs {
|
||||
sreg.checkIDs[checkID] = struct{}{}
|
||||
|
||||
}
|
||||
|
||||
@@ -999,36 +1003,40 @@ func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
|
||||
//
|
||||
// Agent service IDs are of the form:
|
||||
//
|
||||
// {nomadServicePrefix}-{ROLE}-{Service.Name}-{Service.Tags...}
|
||||
// Example Server ID: _nomad-server-nomad-serf
|
||||
// Example Client ID: _nomad-client-nomad-client-http
|
||||
// {nomadServicePrefix}-{ROLE}-b32(sha1({Service.Name}-{Service.Tags...})
|
||||
// Example Server ID: _nomad-server-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4
|
||||
// Example Client ID: _nomad-client-GGNJPGL7YN7RGMVXZILMPVRZZVRSZC7L
|
||||
//
|
||||
func makeAgentServiceID(role string, service *structs.Service) string {
|
||||
parts := make([]string, len(service.Tags)+3)
|
||||
parts[0] = nomadServicePrefix
|
||||
parts[1] = role
|
||||
parts[2] = service.Name
|
||||
copy(parts[3:], service.Tags)
|
||||
return strings.Join(parts, "-")
|
||||
h := sha1.New()
|
||||
io.WriteString(h, service.Name)
|
||||
for _, tag := range service.Tags {
|
||||
io.WriteString(h, tag)
|
||||
}
|
||||
b32 := base32.StdEncoding.EncodeToString(h.Sum(nil))
|
||||
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, b32)
|
||||
}
|
||||
|
||||
// makeTaskServiceID creates a unique ID for identifying a task service in
|
||||
// Consul.
|
||||
//
|
||||
// Task service IDs are of the form:
|
||||
//
|
||||
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
|
||||
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
|
||||
// Consul. All structs.Service fields are included in the ID's hash except
|
||||
// Checks. This allows updates to merely compare IDs.
|
||||
//
|
||||
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
|
||||
func makeTaskServiceID(allocID, taskName string, service *structs.Service) string {
|
||||
parts := make([]string, len(service.Tags)+5)
|
||||
parts[0] = nomadServicePrefix
|
||||
parts[1] = "executor"
|
||||
parts[2] = allocID
|
||||
parts[3] = taskName
|
||||
parts[4] = service.Name
|
||||
copy(parts[5:], service.Tags)
|
||||
return strings.Join(parts, "-")
|
||||
h := sha1.New()
|
||||
io.WriteString(h, allocID)
|
||||
io.WriteString(h, taskName)
|
||||
io.WriteString(h, service.Name)
|
||||
io.WriteString(h, service.PortLabel)
|
||||
io.WriteString(h, service.AddressMode)
|
||||
for _, tag := range service.Tags {
|
||||
io.WriteString(h, tag)
|
||||
}
|
||||
|
||||
// Base32 is used for encoding the hash as sha1 hashes can always be
|
||||
// encoded without padding, only 4 bytes larger than base64, and saves
|
||||
// 8 bytes vs hex.
|
||||
return nomadTaskPrefix + base32.StdEncoding.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
// makeCheckID creates a unique ID for a check.
|
||||
@@ -1084,9 +1092,21 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
|
||||
}
|
||||
|
||||
// isNomadService returns true if the ID matches the pattern of a Nomad managed
|
||||
// service. Agent services return false as independent client and server agents
|
||||
// may be running on the same machine. #2827
|
||||
// service (new or old formats). Agent services return false as independent
|
||||
// client and server agents may be running on the same machine. #2827
|
||||
func isNomadService(id string) bool {
|
||||
return strings.HasPrefix(id, nomadTaskPrefix) || isOldNomadService(id)
|
||||
}
|
||||
|
||||
// isOldNomadService returns true if the ID matches an old pattern managed by
|
||||
// Nomad.
|
||||
//
|
||||
// Pre-0.7.1 task service IDs are of the form:
|
||||
//
|
||||
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
|
||||
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
|
||||
//
|
||||
func isOldNomadService(id string) bool {
|
||||
const prefix = nomadServicePrefix + "-executor"
|
||||
return strings.HasPrefix(id, prefix)
|
||||
}
|
||||
|
||||
@@ -116,7 +116,12 @@ func TestConsul_Integration(t *testing.T) {
|
||||
{
|
||||
Name: "httpd2",
|
||||
PortLabel: "http",
|
||||
Tags: []string{"test", "http2"},
|
||||
Tags: []string{
|
||||
"test",
|
||||
// Use URL-unfriendly tags to test #3620
|
||||
"public-test.ettaviation.com:80/ redirect=302,https://test.ettaviation.com",
|
||||
"public-test.ettaviation.com:443/",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -220,8 +220,8 @@ func TestConsul_ChangeTags(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestConsul_ChangePorts asserts that changing the ports on a service updates
|
||||
// it in Consul. Since ports are not part of the service ID this is a slightly
|
||||
// different code path than changing tags.
|
||||
// it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a
|
||||
// slightly different code path than changing tags.
|
||||
func TestConsul_ChangePorts(t *testing.T) {
|
||||
ctx := setupFake()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
@@ -349,8 +349,8 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
}
|
||||
|
||||
for k, v := range ctx.FakeConsul.services {
|
||||
if k != origServiceKey {
|
||||
t.Errorf("unexpected key change; was: %q -- but found %q", origServiceKey, k)
|
||||
if k == origServiceKey {
|
||||
t.Errorf("expected key change; still: %q", k)
|
||||
}
|
||||
if v.Name != ctx.Task.Services[0].Name {
|
||||
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
|
||||
@@ -370,15 +370,15 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
for k, v := range ctx.FakeConsul.checks {
|
||||
switch v.Name {
|
||||
case "c1":
|
||||
if k != origTCPKey {
|
||||
t.Errorf("unexpected key change for %s from %q to %q", v.Name, origTCPKey, k)
|
||||
if k == origTCPKey {
|
||||
t.Errorf("expected key change for %s from %q", v.Name, origTCPKey)
|
||||
}
|
||||
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
|
||||
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
|
||||
}
|
||||
case "c2":
|
||||
if k != origScriptKey {
|
||||
t.Errorf("unexpected key change for %s from %q to %q", v.Name, origScriptKey, k)
|
||||
if k == origScriptKey {
|
||||
t.Errorf("expected key change for %s from %q", v.Name, origScriptKey)
|
||||
}
|
||||
select {
|
||||
case <-ctx.execs:
|
||||
@@ -1383,9 +1383,16 @@ func TestIsNomadService(t *testing.T) {
|
||||
}{
|
||||
{"_nomad-client-nomad-client-http", false},
|
||||
{"_nomad-server-nomad-serf", false},
|
||||
|
||||
// Pre-0.7.1 style IDs still match
|
||||
{"_nomad-executor-abc", true},
|
||||
{"_nomad-executor", true},
|
||||
|
||||
// Post-0.7.1 style IDs match
|
||||
{"_nomad-task-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4", true},
|
||||
|
||||
{"not-nomad", false},
|
||||
{"_nomad", false},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
||||
@@ -3140,17 +3140,6 @@ func (s *Service) ValidateName(name string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Hash calculates the hash of the check based on it's content and the service
|
||||
// which owns it
|
||||
func (s *Service) Hash() string {
|
||||
h := sha1.New()
|
||||
io.WriteString(h, s.Name)
|
||||
io.WriteString(h, strings.Join(s.Tags, ""))
|
||||
io.WriteString(h, s.PortLabel)
|
||||
io.WriteString(h, s.AddressMode)
|
||||
return fmt.Sprintf("%x", h.Sum(nil))
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultKillTimeout is the default timeout between signaling a task it
|
||||
// will be killed and killing it.
|
||||
|
||||
Reference in New Issue
Block a user