Merge pull request #14546 from hashicorp/f-refactor-check-watcher

client: refactor check watcher to be reusable
This commit is contained in:
Seth Hoenig
2022-09-13 07:32:32 -05:00
committed by GitHub
6 changed files with 477 additions and 875 deletions

View File

@@ -0,0 +1,309 @@
package serviceregistration
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// composite of allocID + taskName for uniqueness
type key string
type restarter struct {
allocID string
taskName string
checkID string
checkName string
taskKey key
logger hclog.Logger
task WorkloadRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
ignoreWarnings bool
// unhealthyState is the time a check first went unhealthy. Set to the
// zero value if the check passes before timeLimit.
unhealthyState time.Time
// graceUntil is when the check's grace period expires and unhealthy
// checks should be counted.
graceUntil time.Time
}
// apply restart state for check and restart task if necessary. Current
// timestamp is passed in so all check updates have the same view of time (and
// to ease testing).
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (r *restarter) apply(ctx context.Context, now time.Time, status string) bool {
healthy := func() {
if !r.unhealthyState.IsZero() {
r.logger.Debug("canceling restart because check became healthy")
r.unhealthyState = time.Time{}
}
}
switch status {
case "critical":
case "warning":
if r.ignoreWarnings {
// Warnings are ignored, reset state and exit
healthy()
return false
}
default:
// All other statuses are ok, reset state and exit
healthy()
return false
}
if now.Before(r.graceUntil) {
// In grace period, exit
return false
}
if r.unhealthyState.IsZero() {
// First failure, set restart deadline
if r.timeLimit != 0 {
r.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", r.timeLimit)
}
r.unhealthyState = now
}
// restart timeLimit after start of this check becoming unhealthy
restartAt := r.unhealthyState.Add(r.timeLimit)
// Must test >= because if limit=1, restartAt == first failure
if now.Equal(restartAt) || now.After(restartAt) {
// hasn't become healthy by deadline, restart!
r.logger.Debug("restarting due to unhealthy check")
// Tell TaskRunner to restart due to failure
reason := fmt.Sprintf("healthcheck: check %q unhealthy", r.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
go asyncRestart(ctx, r.logger, r.task, event)
return true
}
return false
}
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger hclog.Logger, task WorkloadRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true
// Restarting is asynchronous so there's no reason to allow this
// goroutine to block indefinitely.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := task.Restart(ctx, event, failure); err != nil {
// Restart errors are not actionable and only relevant when
// debugging allocation lifecycle management.
logger.Debug("failed to restart task", "error", err, "event_time", event.Time, "event_type", event.Type)
}
}
type CheckStatus struct {
Status string // consider failure vs. critical, warnings, etc.
}
// CheckStatusGetter is implemented per-provider.
type CheckStatusGetter interface {
// Get returns a map from CheckID -> (minimal) CheckStatus
Get() (map[string]CheckStatus, error)
}
// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
remove bool
restart *restarter
}
type CheckWatcher struct {
logger hclog.Logger
getter CheckStatusGetter
// pollFrequency is how often to poll the checks API
pollFrequency time.Duration
// checkUpdateCh sends watches/removals to the main loop
checkUpdateCh chan checkWatchUpdate
// done is closed when Run has exited
done chan struct{}
// failedPreviousInterval is used to indicate whether something went wrong during
// the previous poll interval - if so we can silence ongoing errors
failedPreviousInterval bool
}
func NewCheckWatcher(logger hclog.Logger, getter CheckStatusGetter) *CheckWatcher {
return &CheckWatcher{
logger: logger.ResetNamed("watch.checks"),
getter: getter,
pollFrequency: 1 * time.Second,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
}
}
// Watch a check and restart its task if unhealthy.
func (w *CheckWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, wr WorkloadRestarter) {
if !check.TriggersRestarts() {
return // check_restart not set; no-op
}
c := &restarter{
allocID: allocID,
taskName: taskName,
checkID: checkID,
checkName: check.Name,
taskKey: key(allocID + taskName),
task: wr,
interval: check.Interval,
grace: check.CheckRestart.Grace,
graceUntil: time.Now().Add(check.CheckRestart.Grace),
timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1),
ignoreWarnings: check.CheckRestart.IgnoreWarnings,
logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name),
}
select {
case w.checkUpdateCh <- checkWatchUpdate{
checkID: checkID,
restart: c,
}: // activate watch
case <-w.done: // exited; nothing to do
}
}
// Unwatch a check.
func (w *CheckWatcher) Unwatch(checkID string) {
select {
case w.checkUpdateCh <- checkWatchUpdate{
checkID: checkID,
remove: true,
}: // deactivate watch
case <-w.done: // exited; nothing to do
}
}
func (w *CheckWatcher) Run(ctx context.Context) {
defer close(w.done)
// map of checkID to their restarter handle (contains only checks we are watching)
watched := make(map[string]*restarter)
checkTimer, cleanupCheckTimer := helper.NewSafeTimer(0)
defer cleanupCheckTimer()
stopCheckTimer := func() { // todo: refactor using that other pattern
checkTimer.Stop()
select {
case <-checkTimer.C:
default:
}
}
// initialize with checkTimer disabled
stopCheckTimer()
for {
// disable polling if there are no checks
if len(watched) == 0 {
stopCheckTimer()
}
select {
// caller cancelled us; goodbye
case <-ctx.Done():
return
// received an update; add or remove check
case update := <-w.checkUpdateCh:
if update.remove {
delete(watched, update.checkID)
continue
}
watched[update.checkID] = update.restart
allocID := update.restart.allocID
taskName := update.restart.taskName
checkName := update.restart.checkName
w.logger.Trace("now watching check", "alloc_i", allocID, "task", taskName, "check", checkName)
// turn on the timer if we are now active
if len(watched) == 1 {
stopCheckTimer()
checkTimer.Reset(w.pollFrequency)
}
// poll time; refresh check statuses
case now := <-checkTimer.C:
w.interval(ctx, now, watched)
checkTimer.Reset(w.pollFrequency)
}
}
}
func (w *CheckWatcher) interval(ctx context.Context, now time.Time, watched map[string]*restarter) {
statuses, err := w.getter.Get()
if err != nil && !w.failedPreviousInterval {
w.failedPreviousInterval = true
w.logger.Error("failed to retrieve check statuses", "error", err)
return
}
w.failedPreviousInterval = false
// keep track of tasks restarted this interval
restarts := set.New[key](len(statuses))
// iterate over status of all checks, and update the status of checks
// we care about watching
for checkID, checkRestarter := range watched {
if ctx.Err() != nil {
return // short circuit; caller cancelled us
}
if restarts.Contains(checkRestarter.taskKey) {
// skip; task is already being restarted
delete(watched, checkID)
continue
}
status, exists := statuses[checkID]
if !exists {
// warn only if outside grace period; avoiding race with check registration
if now.After(checkRestarter.graceUntil) {
w.logger.Warn("watched check not found", "check_id", checkID)
}
continue
}
if checkRestarter.apply(ctx, now, status.Status) {
// check will be re-registered & re-watched on startup
delete(watched, checkID)
restarts.Insert(checkRestarter.taskKey)
}
}
// purge passing checks of tasks that are being restarted
if restarts.Size() > 0 {
for checkID, checkRestarter := range watched {
if restarts.Contains(checkRestarter.taskKey) {
delete(watched, checkID)
}
}
}
}

View File

@@ -1,4 +1,4 @@
package consul
package serviceregistration
import (
"context"
@@ -7,30 +7,29 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/shoenig/test/must"
)
// checkRestartRecord is used by a testFakeCtx to record when restarts occur
// restartRecord is used by a fakeWorkloadRestarter to record when restarts occur
// due to a watched check.
type checkRestartRecord struct {
type restartRecord struct {
timestamp time.Time
source string
reason string
failure bool
}
// fakeCheckRestarter is a test implementation of TaskRestarter.
type fakeCheckRestarter struct {
// fakeWorkloadRestarter is a test implementation of TaskRestarter.
type fakeWorkloadRestarter struct {
// restarts is a slice of all of the restarts triggered by the checkWatcher
restarts []checkRestartRecord
restarts []restartRecord
// need the checkWatcher to re-Watch restarted tasks like TaskRunner
watcher *checkWatcher
watcher *CheckWatcher
// check to re-Watch on restarts
check *structs.ServiceCheck
@@ -38,13 +37,12 @@ type fakeCheckRestarter struct {
taskName string
checkName string
mu sync.Mutex
lock sync.Mutex
}
// newFakeCheckRestart creates a new TaskRestarter. It needs all of the
// parameters checkWatcher.Watch expects.
func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, c *structs.ServiceCheck) *fakeCheckRestarter {
return &fakeCheckRestarter{
// newFakeCheckRestart creates a new mock WorkloadRestarter.
func newFakeWorkloadRestarter(w *CheckWatcher, allocID, taskName, checkName string, c *structs.ServiceCheck) *fakeWorkloadRestarter {
return &fakeWorkloadRestarter{
watcher: w,
check: c,
allocID: allocID,
@@ -53,14 +51,15 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string,
}
}
// Restart implements part of the TaskRestarter interface needed for check
// watching and is normally fulfilled by a TaskRunner.
// Restart implements part of the TaskRestarter interface needed for check watching
// and is normally fulfilled by a TaskRunner.
//
// Restarts are recorded in the []restarts field and re-Watch the check.
func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
c.mu.Lock()
defer c.mu.Unlock()
restart := checkRestartRecord{
func (c *fakeWorkloadRestarter) Restart(_ context.Context, event *structs.TaskEvent, failure bool) error {
c.lock.Lock()
defer c.lock.Unlock()
restart := restartRecord{
timestamp: time.Now(),
source: event.Type,
reason: event.DisplayMessage,
@@ -73,10 +72,10 @@ func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEve
return nil
}
// String for debugging
func (c *fakeCheckRestarter) String() string {
c.mu.Lock()
defer c.mu.Unlock()
// String is useful for debugging.
func (c *fakeWorkloadRestarter) String() string {
c.lock.Lock()
defer c.lock.Unlock()
s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName)
for _, r := range c.restarts {
@@ -85,76 +84,55 @@ func (c *fakeCheckRestarter) String() string {
return s
}
// GetRestarts for testing in a threadsafe way
func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord {
c.mu.Lock()
defer c.mu.Unlock()
// GetRestarts for testing in a thread-safe way
func (c *fakeWorkloadRestarter) GetRestarts() []restartRecord {
c.lock.Lock()
defer c.lock.Unlock()
o := make([]checkRestartRecord, len(c.restarts))
o := make([]restartRecord, len(c.restarts))
copy(o, c.restarts)
return o
}
// checkResponse is a response returned by the fakeChecksAPI after the given
// time.
type checkResponse struct {
// response is a response returned by fakeCheckStatusGetter after a certain time
type response struct {
at time.Time
id string
status string
}
// fakeChecksAPI implements the Checks() method for testing Consul.
type fakeChecksAPI struct {
// responses is a map of check ids to their status at a particular
// time. checkResponses must be in chronological order.
responses map[string][]checkResponse
mu sync.Mutex
// fakeCheckStatusGetter is a mock implementation of CheckStatusGetter
type fakeCheckStatusGetter struct {
lock sync.Mutex
responses map[string][]response
}
func newFakeChecksAPI() *fakeChecksAPI {
return &fakeChecksAPI{responses: make(map[string][]checkResponse)}
}
func (g *fakeCheckStatusGetter) Get() (map[string]CheckStatus, error) {
g.lock.Lock()
defer g.lock.Unlock()
// add a new check status to Consul at the given time.
func (c *fakeChecksAPI) add(id, status string, at time.Time) {
c.mu.Lock()
c.responses[id] = append(c.responses[id], checkResponse{at, id, status})
c.mu.Unlock()
}
func (c *fakeChecksAPI) ChecksWithFilterOpts(filter string, opts *api.QueryOptions) (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
result := make(map[string]*api.AgentCheck, len(c.responses))
// Use the latest response for each check
for k, vs := range c.responses {
result := make(map[string]CheckStatus)
// use the newest response after now for the response
for k, vs := range g.responses {
for _, v := range vs {
if v.at.After(now) {
break
}
result[k] = &api.AgentCheck{
CheckID: k,
Name: k,
Status: v.status,
}
result[k] = CheckStatus{Status: v.status}
}
}
return result, nil
}
// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
logger := testlog.HCLogger(t)
checksAPI := newFakeChecksAPI()
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil), NewMockAgent(ossFeatures))
cw := newCheckWatcher(logger, checksAPI, namespacesClient)
cw.pollFreq = 10 * time.Millisecond
return checksAPI, cw
func (g *fakeCheckStatusGetter) add(checkID, status string, at time.Time) {
g.lock.Lock()
defer g.lock.Unlock()
if g.responses == nil {
g.responses = make(map[string][]response)
}
g.responses[checkID] = append(g.responses[checkID], response{at, checkID, status})
}
func testCheck() *structs.ServiceCheck {
@@ -170,8 +148,22 @@ func testCheck() *structs.ServiceCheck {
}
}
// TestCheckWatcher_Skip asserts unwatched checks are ignored.
func TestCheckWatcher_Skip(t *testing.T) {
// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup(t *testing.T) (*fakeCheckStatusGetter, *CheckWatcher) {
logger := testlog.HCLogger(t)
getter := new(fakeCheckStatusGetter)
cw := NewCheckWatcher(logger, getter)
cw.pollFrequency = 10 * time.Millisecond
return getter, cw
}
func before() time.Time {
return time.Now().Add(-10 * time.Second)
}
// TestCheckWatcher_SkipUnwatched asserts unwatched checks are ignored.
func TestCheckWatcher_SkipUnwatched(t *testing.T) {
ci.Parallel(t)
// Create a check with restarting disabled
@@ -179,65 +171,61 @@ func TestCheckWatcher_Skip(t *testing.T) {
check.CheckRestart = nil
logger := testlog.HCLogger(t)
checksAPI := newFakeChecksAPI()
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil), NewMockAgent(ossFeatures))
getter := new(fakeCheckStatusGetter)
cw := newCheckWatcher(logger, checksAPI, namespacesClient)
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw := NewCheckWatcher(logger, getter)
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)
// Check should have been dropped as it's not watched
if n := len(cw.checkUpdateCh); n != 0 {
t.Fatalf("expected 0 checks to be enqueued for watching but found %d", n)
}
enqueued := len(cw.checkUpdateCh)
must.Zero(t, enqueued, must.Sprintf("expected 0 checks to be enqueued for watching but found %d", enqueued))
}
// TestCheckWatcher_Healthy asserts healthy tasks are not restarted.
func TestCheckWatcher_Healthy(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
now := before()
getter, cw := testWatcherSetup(t)
// Make both checks healthy from the beginning
getter.add("testcheck1", "passing", now)
getter.add("testcheck2", "passing", now)
check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
check2 := testCheck()
check2.CheckRestart.Limit = 1
check2.CheckRestart.Grace = 0
restarter2 := newFakeCheckRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2)
restarter2 := newFakeWorkloadRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2)
cw.Watch("testalloc2", "testtask2", "testcheck2", check2, restarter2)
// Make both checks healthy from the beginning
fakeAPI.add("testcheck1", "passing", time.Time{})
fakeAPI.add("testcheck2", "passing", time.Time{})
// Run
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
cw.Run(ctx)
// Ensure restart was never called
if n := len(restarter1.restarts); n > 0 {
t.Errorf("expected check 1 to not be restarted but found %d:\n%s", n, restarter1)
}
if n := len(restarter2.restarts); n > 0 {
t.Errorf("expected check 2 to not be restarted but found %d:\n%s", n, restarter2)
}
must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted"))
must.Empty(t, restarter2.restarts, must.Sprint("expected check 2 to not be restarted"))
}
// TestCheckWatcher_Unhealthy asserts unhealthy tasks are restarted exactly once.
func TestCheckWatcher_Unhealthy(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
now := before()
getter, cw := testWatcherSetup(t)
// Check has always been failing
fakeAPI.add("testcheck1", "critical", time.Time{})
getter.add("testcheck1", "critical", now)
check1 := testCheck()
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
// Run
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
@@ -245,7 +233,7 @@ func TestCheckWatcher_Unhealthy(t *testing.T) {
cw.Run(ctx)
// Ensure restart was called exactly once
require.Len(t, restarter1.restarts, 1)
must.Len(t, 1, restarter1.restarts, must.Sprint("expected check to be restarted once"))
}
// TestCheckWatcher_HealthyWarning asserts checks in warning with
@@ -253,27 +241,26 @@ func TestCheckWatcher_Unhealthy(t *testing.T) {
func TestCheckWatcher_HealthyWarning(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
now := before()
getter, cw := testWatcherSetup(t)
// Check is always in warning but that's ok
getter.add("testcheck1", "warning", now)
check1 := testCheck()
check1.CheckRestart.Limit = 1
check1.CheckRestart.Grace = 0
check1.CheckRestart.IgnoreWarnings = true
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
// Check is always in warning but that's ok
fakeAPI.add("testcheck1", "warning", time.Time{})
// Run
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
cw.Run(ctx)
// Ensure restart was never called on check 1
if n := len(restarter1.restarts); n > 0 {
t.Errorf("expected check 1 to not be restarted but found %d", n)
}
must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted"))
}
// TestCheckWatcher_Flapping asserts checks that flap from healthy to unhealthy
@@ -281,56 +268,53 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) {
func TestCheckWatcher_Flapping(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
getter, cw := testWatcherSetup(t)
check1 := testCheck()
check1.CheckRestart.Grace = 0
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
// Check flaps and is never failing for the full 200ms needed to restart
now := time.Now()
fakeAPI.add("testcheck1", "passing", now)
fakeAPI.add("testcheck1", "critical", now.Add(100*time.Millisecond))
fakeAPI.add("testcheck1", "passing", now.Add(250*time.Millisecond))
fakeAPI.add("testcheck1", "critical", now.Add(300*time.Millisecond))
fakeAPI.add("testcheck1", "passing", now.Add(450*time.Millisecond))
getter.add("testcheck1", "passing", now)
getter.add("testcheck1", "critical", now.Add(100*time.Millisecond))
getter.add("testcheck1", "passing", now.Add(250*time.Millisecond))
getter.add("testcheck1", "critical", now.Add(300*time.Millisecond))
getter.add("testcheck1", "passing", now.Add(450*time.Millisecond))
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
defer cancel()
cw.Run(ctx)
// Ensure restart was never called on check 1
if n := len(restarter1.restarts); n > 0 {
t.Errorf("expected check 1 to not be restarted but found %d\n%s", n, restarter1)
}
must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted"))
}
// TestCheckWatcher_Unwatch asserts unwatching checks prevents restarts.
func TestCheckWatcher_Unwatch(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
now := before()
getter, cw := testWatcherSetup(t)
// Always failing
getter.add("testcheck1", "critical", now)
// Unwatch immediately
check1 := testCheck()
check1.CheckRestart.Limit = 1
check1.CheckRestart.Grace = 100 * time.Millisecond
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
cw.Unwatch("testcheck1")
// Always failing
fakeAPI.add("testcheck1", "critical", time.Time{})
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
cw.Run(ctx)
// Ensure restart was never called on check 1
if n := len(restarter1.restarts); n > 0 {
t.Errorf("expected check 1 to not be restarted but found %d\n%s", n, restarter1)
}
must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted"))
}
// TestCheckWatcher_MultipleChecks asserts that when there are multiple checks
@@ -339,31 +323,34 @@ func TestCheckWatcher_Unwatch(t *testing.T) {
func TestCheckWatcher_MultipleChecks(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
getter, cw := testWatcherSetup(t)
// check is critical, 3 passing; should only be 1 net restart
now := time.Now()
getter.add("testcheck1", "critical", before())
getter.add("testcheck1", "passing", now.Add(150*time.Millisecond))
getter.add("testcheck2", "critical", before())
getter.add("testcheck2", "passing", now.Add(150*time.Millisecond))
getter.add("testcheck3", "passing", time.Time{})
check1 := testCheck()
check1.Name = "testcheck1"
check1.CheckRestart.Limit = 1
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
check2 := testCheck()
check2.Name = "testcheck2"
check2.CheckRestart.Limit = 1
restarter2 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck2", check2)
restarter2 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck2", check2)
cw.Watch("testalloc1", "testtask1", "testcheck2", check2, restarter2)
check3 := testCheck()
check3.Name = "testcheck3"
check3.CheckRestart.Limit = 1
restarter3 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck3", check3)
restarter3 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck3", check3)
cw.Watch("testalloc1", "testtask1", "testcheck3", check3, restarter3)
// check 2 & 3 fail long enough to cause 1 restart, but only 1 should restart
now := time.Now()
fakeAPI.add("testcheck1", "critical", now)
fakeAPI.add("testcheck1", "passing", now.Add(150*time.Millisecond))
fakeAPI.add("testcheck2", "critical", now)
fakeAPI.add("testcheck2", "passing", now.Add(150*time.Millisecond))
fakeAPI.add("testcheck3", "passing", time.Time{})
// Run
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
@@ -388,17 +375,17 @@ func TestCheckWatcher_MultipleChecks(t *testing.T) {
func TestCheckWatcher_Deadlock(t *testing.T) {
ci.Parallel(t)
fakeAPI, cw := testWatcherSetup(t)
getter, cw := testWatcherSetup(t)
// If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes
// a deadlock due to checkWatcher.Run being blocked in
// checkRestart.apply and unable to process updates from the chan!
n := cap(cw.checkUpdateCh) + 1
checks := make([]*structs.ServiceCheck, n)
restarters := make([]*fakeCheckRestarter, n)
restarters := make([]*fakeWorkloadRestarter, n)
for i := 0; i < n; i++ {
c := testCheck()
r := newFakeCheckRestarter(cw,
r := newFakeWorkloadRestarter(cw,
fmt.Sprintf("alloc%d", i),
fmt.Sprintf("task%d", i),
fmt.Sprintf("check%d", i),
@@ -420,7 +407,7 @@ func TestCheckWatcher_Deadlock(t *testing.T) {
// Make them all fail
for _, r := range restarters {
fakeAPI.add(r.checkName, "critical", time.Time{})
getter.add(r.checkName, "critical", time.Time{})
}
// Ensure that restart was called exactly once on all checks
@@ -432,6 +419,6 @@ func TestCheckWatcher_Deadlock(t *testing.T) {
}
return true, nil
}, func(err error) {
require.NoError(t, err)
must.NoError(t, err)
})
}

View File

@@ -1,355 +0,0 @@
package consul
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// defaultPollFreq is the default rate to poll the Consul Checks API
defaultPollFreq = 900 * time.Millisecond
)
// ChecksAPI is the part of the Consul API the checkWatcher requires.
type ChecksAPI interface {
ChecksWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentCheck, error)
}
// checkRestart handles restarting a task if a check is unhealthy.
type checkRestart struct {
allocID string
taskName string
checkID string
checkName string
taskKey string // composite of allocID + taskName for uniqueness
task serviceregistration.WorkloadRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
ignoreWarnings bool
// Mutable fields
// unhealthyState is the time a check first went unhealthy. Set to the
// zero value if the check passes before timeLimit.
unhealthyState time.Time
// graceUntil is when the check's grace period expires and unhealthy
// checks should be counted.
graceUntil time.Time
logger hclog.Logger
}
// apply restart state for check and restart task if necessary. Current
// timestamp is passed in so all check updates have the same view of time (and
// to ease testing).
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) bool {
healthy := func() {
if !c.unhealthyState.IsZero() {
c.logger.Debug("canceling restart because check became healthy")
c.unhealthyState = time.Time{}
}
}
switch status {
case api.HealthCritical:
case api.HealthWarning:
if c.ignoreWarnings {
// Warnings are ignored, reset state and exit
healthy()
return false
}
default:
// All other statuses are ok, reset state and exit
healthy()
return false
}
if now.Before(c.graceUntil) {
// In grace period, exit
return false
}
if c.unhealthyState.IsZero() {
// First failure, set restart deadline
if c.timeLimit != 0 {
c.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", c.timeLimit)
}
c.unhealthyState = now
}
// restart timeLimit after start of this check becoming unhealthy
restartAt := c.unhealthyState.Add(c.timeLimit)
// Must test >= because if limit=1, restartAt == first failure
if now.Equal(restartAt) || now.After(restartAt) {
// hasn't become healthy by deadline, restart!
c.logger.Debug("restarting due to unhealthy check")
// Tell TaskRunner to restart due to failure
reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
go asyncRestart(ctx, c.logger, c.task, event)
return true
}
return false
}
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger hclog.Logger, task serviceregistration.WorkloadRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true
// Restarting is asynchronous so there's no reason to allow this
// goroutine to block indefinitely.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := task.Restart(ctx, event, failure); err != nil {
// Restart errors are not actionable and only relevant when
// debugging allocation lifecycle management.
logger.Debug("failed to restart task", "error", err,
"event_time", event.Time, "event_type", event.Type)
}
}
// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
remove bool
checkRestart *checkRestart
}
// checkWatcher watches Consul checks and restarts tasks when they're
// unhealthy.
type checkWatcher struct {
namespacesClient *NamespacesClient
checksAPI ChecksAPI
// pollFreq is how often to poll the checks API and defaults to
// defaultPollFreq
pollFreq time.Duration
// checkUpdateCh is how watches (and removals) are sent to the main
// watching loop
checkUpdateCh chan checkWatchUpdate
// done is closed when Run has exited
done chan struct{}
// lastErr is true if the last Consul call failed. It is used to
// squelch repeated error messages.
lastErr bool
logger hclog.Logger
}
// newCheckWatcher creates a new checkWatcher but does not call its Run method.
func newCheckWatcher(logger hclog.Logger, checksAPI ChecksAPI, namespacesClient *NamespacesClient) *checkWatcher {
return &checkWatcher{
namespacesClient: namespacesClient,
checksAPI: checksAPI,
pollFreq: defaultPollFreq,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
logger: logger.ResetNamed("consul.health"),
}
}
// Run the main Consul checks watching loop to restart tasks when their checks
// fail. Blocks until context is canceled.
func (w *checkWatcher) Run(ctx context.Context) {
defer close(w.done)
// map of check IDs to their metadata
checks := map[string]*checkRestart{}
// timer for check polling
checkTimer := time.NewTimer(0)
defer checkTimer.Stop() // ensure timer is never leaked
stopTimer := func() {
checkTimer.Stop()
select {
case <-checkTimer.C:
default:
}
}
// disable by default
stopTimer()
// Main watch loop
WATCHER:
for {
// disable polling if there are no checks
if len(checks) == 0 {
stopTimer()
}
select {
case update := <-w.checkUpdateCh:
if update.remove {
// Remove a check
delete(checks, update.checkID)
continue
}
// Add/update a check
checks[update.checkID] = update.checkRestart
w.logger.Debug("watching check", "alloc_id", update.checkRestart.allocID,
"task", update.checkRestart.taskName, "check", update.checkRestart.checkName)
// if first check was added make sure polling is enabled
if len(checks) == 1 {
stopTimer()
checkTimer.Reset(w.pollFreq)
}
case <-ctx.Done():
return
case <-checkTimer.C:
checkTimer.Reset(w.pollFreq)
// Set "now" as the point in time the following check results represent
now := time.Now()
// Get the list of all namespaces so we can iterate them.
namespaces, err := w.namespacesClient.List()
if err != nil {
if !w.lastErr {
w.lastErr = true
w.logger.Error("failed retrieving namespaces", "error", err)
}
continue WATCHER
}
checkResults := make(map[string]*api.AgentCheck)
for _, namespace := range namespaces {
nsResults, err := w.checksAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
if !w.lastErr {
w.lastErr = true
w.logger.Error("failed retrieving health checks", "error", err)
}
continue WATCHER
} else {
for k, v := range nsResults {
checkResults[k] = v
}
}
}
w.lastErr = false
// Keep track of tasks restarted this period so they
// are only restarted once and all of their checks are
// removed.
restartedTasks := map[string]struct{}{}
// Loop over watched checks and update their status from results
for cid, check := range checks {
// Shortcircuit if told to exit
if ctx.Err() != nil {
return
}
if _, ok := restartedTasks[check.taskKey]; ok {
// Check for this task already restarted; remove and skip check
delete(checks, cid)
continue
}
result, ok := checkResults[cid]
if !ok {
// Only warn if outside grace period to avoid races with check registration
if now.After(check.graceUntil) {
w.logger.Warn("watched check not found in Consul", "check", check.checkName, "check_id", cid)
}
continue
}
restarted := check.apply(ctx, now, result.Status)
if restarted {
// Checks are registered+watched on
// startup, so it's safe to remove them
// whenever they're restarted
delete(checks, cid)
restartedTasks[check.taskKey] = struct{}{}
}
}
// Ensure even passing checks for restartedTasks are removed
if len(restartedTasks) > 0 {
for cid, check := range checks {
if _, ok := restartedTasks[check.taskKey]; ok {
delete(checks, cid)
}
}
}
}
}
}
// Watch a check and restart its task if unhealthy.
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter serviceregistration.WorkloadRestarter) {
if !check.TriggersRestarts() {
// Not watched, noop
return
}
c := &checkRestart{
allocID: allocID,
taskName: taskName,
checkID: checkID,
checkName: check.Name,
taskKey: fmt.Sprintf("%s%s", allocID, taskName), // unique task ID
task: restarter,
interval: check.Interval,
grace: check.CheckRestart.Grace,
graceUntil: time.Now().Add(check.CheckRestart.Grace),
timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1),
ignoreWarnings: check.CheckRestart.IgnoreWarnings,
logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name),
}
update := checkWatchUpdate{
checkID: checkID,
checkRestart: c,
}
select {
case w.checkUpdateCh <- update:
// sent watch
case <-w.done:
// exited; nothing to do
}
}
// Unwatch a check.
func (w *checkWatcher) Unwatch(cid string) {
c := checkWatchUpdate{
checkID: cid,
remove: true,
}
select {
case w.checkUpdateCh <- c:
// sent remove watch
case <-w.done:
// exited; nothing to do
}
}

View File

@@ -419,13 +419,43 @@ type ServiceClient struct {
deregisterProbationExpiry time.Time
// checkWatcher restarts checks that are unhealthy.
checkWatcher *checkWatcher
// checkWatcher *checkWatcher
checkWatcher *serviceregistration.CheckWatcher
// isClientAgent specifies whether this Consul client is being used
// by a Nomad client.
isClientAgent bool
}
// checkStatusGetter is the consul-specific implementation of serviceregistration.CheckStatusGetter
type checkStatusGetter struct {
agentAPI AgentAPI
namespacesClient *NamespacesClient
}
func (csg *checkStatusGetter) Get() (map[string]serviceregistration.CheckStatus, error) {
// Get the list of all namespaces so we can iterate them.
namespaces, err := csg.namespacesClient.List()
if err != nil {
return nil, err
}
results := make(map[string]serviceregistration.CheckStatus)
for _, namespace := range namespaces {
resultsInNamespace, err := csg.agentAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
if err != nil {
return nil, err
}
for k, v := range resultsInNamespace {
results[k] = serviceregistration.CheckStatus{
Status: v.Status,
}
}
}
return results, nil
}
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client, logger and takes whether the client is being used by a Nomad Client agent.
// When being used by a Nomad client, this Consul client reconciles all services and
@@ -450,9 +480,12 @@ func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, log
allocRegistrations: make(map[string]*serviceregistration.AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, agentAPI, namespacesClient),
isClientAgent: isNomadClient,
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
checkWatcher: serviceregistration.NewCheckWatcher(logger, &checkStatusGetter{
agentAPI: agentAPI,
namespacesClient: namespacesClient,
}),
}
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"sync/atomic"
"testing"
@@ -329,380 +328,6 @@ func TestConsul_ChangePorts(t *testing.T) {
}
}
// TestConsul_ChangeChecks asserts that updating only the checks on a service
// properly syncs with Consul.
func TestConsul_ChangeChecks(t *testing.T) {
ci.Parallel(t)
ctx := setupFake(t)
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
CheckRestart: &structs.CheckRestart{
Limit: 3,
},
},
}
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
// Assert a check restart watch update was enqueued and clear it
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
upd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
c1ID := upd.checkID
// Query the allocs registrations and then again when we update. The IDs
// should change
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
if reg1 == nil {
t.Fatalf("Nil alloc registrations: %v", err)
}
if num := reg1.NumServices(); num != 1 {
t.Fatalf("Wrong number of services: got %d; want 1", num)
}
if num := reg1.NumChecks(); num != 1 {
t.Fatalf("Wrong number of checks: got %d; want 1", num)
}
origServiceKey := ""
for k, v := range ctx.FakeConsul.services["default"] {
origServiceKey = k
if v.Name != ctx.Workload.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
}
if v.Port != xPort {
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
}
}
if n := len(ctx.FakeConsul.checks["default"]); n != 1 {
t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"])
}
for _, v := range ctx.FakeConsul.checks["default"] {
if v.Name != "c1" {
t.Fatalf("expected check c1 but found %q", v.Name)
}
}
// Now add a check and modify the original
origWorkload := ctx.Workload.Copy()
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: 2 * time.Second,
Timeout: time.Second,
PortLabel: "x",
CheckRestart: &structs.CheckRestart{
Limit: 3,
},
},
{
Name: "c2",
Type: "http",
Path: "/",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Assert 2 check restart watch updates was enqueued
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 2 {
t.Fatalf("expected 2 check restart updates but found %d", n)
}
// First the new watch
upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if upd.checkID == c1ID || upd.remove {
t.Fatalf("expected check watch update to be an add of checkID=%q but found remove=%t checkID=%q",
c1ID, upd.remove, upd.checkID)
}
// Then remove the old watch
upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if upd.checkID != c1ID || !upd.remove {
t.Fatalf("expected check watch update to be a removal of checkID=%q but found remove=%t checkID=%q",
c1ID, upd.remove, upd.checkID)
}
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
if _, ok := ctx.FakeConsul.services["default"][origServiceKey]; !ok {
t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services)
}
if n := len(ctx.FakeConsul.checks["default"]); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"])
}
for k, v := range ctx.FakeConsul.checks["default"] {
switch v.Name {
case "c1":
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
}
// update id
c1ID = k
case "c2":
if expected := fmt.Sprintf("http://:%d/", xPort); v.HTTP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.HTTP)
}
default:
t.Errorf("Unknown check: %q", k)
}
}
// Check again and ensure the IDs changed
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
if reg2 == nil {
t.Fatalf("Nil alloc registrations: %v", err)
}
if num := reg2.NumServices(); num != 1 {
t.Fatalf("Wrong number of services: got %d; want 1", num)
}
if num := reg2.NumChecks(); num != 2 {
t.Fatalf("Wrong number of checks: got %d; want 2", num)
}
for task, treg := range reg1.Tasks {
otherTaskReg, ok := reg2.Tasks[task]
if !ok {
t.Fatalf("Task %q not in second reg", task)
}
for sID, sreg := range treg.Services {
otherServiceReg, ok := otherTaskReg.Services[sID]
if !ok {
t.Fatalf("service ID changed")
}
for newID := range sreg.CheckIDs {
if _, ok := otherServiceReg.CheckIDs[newID]; ok {
t.Fatalf("check IDs should change")
}
}
}
}
// Alter a CheckRestart and make sure the watcher is updated but nothing else
origWorkload = ctx.Workload.Copy()
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: 2 * time.Second,
Timeout: time.Second,
PortLabel: "x",
CheckRestart: &structs.CheckRestart{
Limit: 11,
},
},
{
Name: "c2",
Type: "http",
Path: "/",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.checks["default"]); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"])
}
for k, v := range ctx.FakeConsul.checks["default"] {
if v.Name == "c1" {
if k != c1ID {
t.Errorf("expected c1 to still have id %q but found %q", c1ID, k)
}
break
}
}
// Assert a check restart watch update was enqueued for a removal and an add
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
<-ctx.ServiceClient.checkWatcher.checkUpdateCh
}
// TestConsul_RegServices tests basic service registration.
func TestConsul_RegServices(t *testing.T) {
ci.Parallel(t)
ctx := setupFake(t)
// Add a check w/restarting
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "testcheck",
Type: "tcp",
Interval: 100 * time.Millisecond,
CheckRestart: &structs.CheckRestart{
Limit: 3,
},
},
}
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name != ctx.Workload.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags)
}
if v.Port != xPort {
t.Errorf("expected Port=%d != %d", xPort, v.Port)
}
}
// Assert the check update is pending
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
// Assert the check update is properly formed
checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd.checkRestart.allocID != ctx.Workload.AllocID {
t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID)
}
if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected {
t.Fatalf("expected check's time limit to be %v but found %v", expected, checkUpd.checkRestart.timeLimit)
}
// Make a change which will register a new service
ctx.Workload.Services[0].Name = "taskname-service2"
ctx.Workload.Services[0].Tags[0] = "tag3"
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Assert check update is pending
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
// Assert the check update's id has changed
checkUpd2 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd.checkID == checkUpd2.checkID {
t.Fatalf("expected new check update to have a new ID both both have: %q", checkUpd.checkID)
}
// Make sure changes don't take affect until sync() is called (since
// Run() isn't running)
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
for _, v := range ctx.FakeConsul.services["default"] {
if reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags to differ, changes applied before sync()")
}
}
// Now sync() and re-check for the applied updates
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services["default"]); n != 2 {
t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
found := false
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name == ctx.Workload.Services[0].Name {
if found {
t.Fatalf("found new service name %q twice", v.Name)
}
found = true
if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags)
}
}
}
if !found {
t.Fatalf("did not find new service %q", ctx.Workload.Services[0].Name)
}
// Remove the new task
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
if err := ctx.syncOnce(syncNewOps); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services["default"]); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"])
}
for _, v := range ctx.FakeConsul.services["default"] {
if v.Name != "taskname-service" {
t.Errorf("expected original task to survive not %q", v.Name)
}
}
// Assert check update is pending
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
// Assert the check update's id is correct and that it's a removal
checkUpd3 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd2.checkID != checkUpd3.checkID {
t.Fatalf("expected checkid %q but found %q", checkUpd2.checkID, checkUpd3.checkID)
}
if !checkUpd3.remove {
t.Fatalf("expected check watch removal update but found: %#v", checkUpd3)
}
}
// TestConsul_ShutdownOK tests the ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownOK(t *testing.T) {

View File

@@ -47,7 +47,10 @@ const (
minCheckTimeout = 1 * time.Second
)
// ServiceCheck represents the Consul health check.
// ServiceCheck represents a Nomad or Consul service health check.
//
// The fields available depend on the service provider the check is being
// registered into.
type ServiceCheck struct {
Name string // Name of the check, defaults to a generated label
Type string // Type of the check - tcp, http, docker and script