diff --git a/client/serviceregistration/watcher.go b/client/serviceregistration/watcher.go new file mode 100644 index 000000000..a4b480a2c --- /dev/null +++ b/client/serviceregistration/watcher.go @@ -0,0 +1,309 @@ +package serviceregistration + +import ( + "context" + "fmt" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" +) + +// composite of allocID + taskName for uniqueness +type key string + +type restarter struct { + allocID string + taskName string + checkID string + checkName string + taskKey key + + logger hclog.Logger + task WorkloadRestarter + grace time.Duration + interval time.Duration + timeLimit time.Duration + ignoreWarnings bool + + // unhealthyState is the time a check first went unhealthy. Set to the + // zero value if the check passes before timeLimit. + unhealthyState time.Time + + // graceUntil is when the check's grace period expires and unhealthy + // checks should be counted. + graceUntil time.Time +} + +// apply restart state for check and restart task if necessary. Current +// timestamp is passed in so all check updates have the same view of time (and +// to ease testing). +// +// Returns true if a restart was triggered in which case this check should be +// removed (checks are added on task startup). +func (r *restarter) apply(ctx context.Context, now time.Time, status string) bool { + healthy := func() { + if !r.unhealthyState.IsZero() { + r.logger.Debug("canceling restart because check became healthy") + r.unhealthyState = time.Time{} + } + } + switch status { + case "critical": + case "warning": + if r.ignoreWarnings { + // Warnings are ignored, reset state and exit + healthy() + return false + } + default: + // All other statuses are ok, reset state and exit + healthy() + return false + } + + if now.Before(r.graceUntil) { + // In grace period, exit + return false + } + + if r.unhealthyState.IsZero() { + // First failure, set restart deadline + if r.timeLimit != 0 { + r.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", r.timeLimit) + } + r.unhealthyState = now + } + + // restart timeLimit after start of this check becoming unhealthy + restartAt := r.unhealthyState.Add(r.timeLimit) + + // Must test >= because if limit=1, restartAt == first failure + if now.Equal(restartAt) || now.After(restartAt) { + // hasn't become healthy by deadline, restart! + r.logger.Debug("restarting due to unhealthy check") + + // Tell TaskRunner to restart due to failure + reason := fmt.Sprintf("healthcheck: check %q unhealthy", r.checkName) + event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason) + go asyncRestart(ctx, r.logger, r.task, event) + return true + } + + return false +} + +// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended +// to be called in a goroutine. +func asyncRestart(ctx context.Context, logger hclog.Logger, task WorkloadRestarter, event *structs.TaskEvent) { + // Check watcher restarts are always failures + const failure = true + + // Restarting is asynchronous so there's no reason to allow this + // goroutine to block indefinitely. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + if err := task.Restart(ctx, event, failure); err != nil { + // Restart errors are not actionable and only relevant when + // debugging allocation lifecycle management. + logger.Debug("failed to restart task", "error", err, "event_time", event.Time, "event_type", event.Type) + } +} + +type CheckStatus struct { + Status string // consider failure vs. critical, warnings, etc. +} + +// CheckStatusGetter is implemented per-provider. +type CheckStatusGetter interface { + // Get returns a map from CheckID -> (minimal) CheckStatus + Get() (map[string]CheckStatus, error) +} + +// checkWatchUpdates add or remove checks from the watcher +type checkWatchUpdate struct { + checkID string + remove bool + restart *restarter +} + +type CheckWatcher struct { + logger hclog.Logger + getter CheckStatusGetter + + // pollFrequency is how often to poll the checks API + pollFrequency time.Duration + + // checkUpdateCh sends watches/removals to the main loop + checkUpdateCh chan checkWatchUpdate + + // done is closed when Run has exited + done chan struct{} + + // failedPreviousInterval is used to indicate whether something went wrong during + // the previous poll interval - if so we can silence ongoing errors + failedPreviousInterval bool +} + +func NewCheckWatcher(logger hclog.Logger, getter CheckStatusGetter) *CheckWatcher { + return &CheckWatcher{ + logger: logger.ResetNamed("watch.checks"), + getter: getter, + pollFrequency: 1 * time.Second, + checkUpdateCh: make(chan checkWatchUpdate, 8), + done: make(chan struct{}), + } +} + +// Watch a check and restart its task if unhealthy. +func (w *CheckWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, wr WorkloadRestarter) { + if !check.TriggersRestarts() { + return // check_restart not set; no-op + } + + c := &restarter{ + allocID: allocID, + taskName: taskName, + checkID: checkID, + checkName: check.Name, + taskKey: key(allocID + taskName), + task: wr, + interval: check.Interval, + grace: check.CheckRestart.Grace, + graceUntil: time.Now().Add(check.CheckRestart.Grace), + timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1), + ignoreWarnings: check.CheckRestart.IgnoreWarnings, + logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name), + } + + select { + case w.checkUpdateCh <- checkWatchUpdate{ + checkID: checkID, + restart: c, + }: // activate watch + case <-w.done: // exited; nothing to do + } +} + +// Unwatch a check. +func (w *CheckWatcher) Unwatch(checkID string) { + select { + case w.checkUpdateCh <- checkWatchUpdate{ + checkID: checkID, + remove: true, + }: // deactivate watch + case <-w.done: // exited; nothing to do + } +} + +func (w *CheckWatcher) Run(ctx context.Context) { + defer close(w.done) + + // map of checkID to their restarter handle (contains only checks we are watching) + watched := make(map[string]*restarter) + + checkTimer, cleanupCheckTimer := helper.NewSafeTimer(0) + defer cleanupCheckTimer() + + stopCheckTimer := func() { // todo: refactor using that other pattern + checkTimer.Stop() + select { + case <-checkTimer.C: + default: + } + } + + // initialize with checkTimer disabled + stopCheckTimer() + + for { + // disable polling if there are no checks + if len(watched) == 0 { + stopCheckTimer() + } + + select { + // caller cancelled us; goodbye + case <-ctx.Done(): + return + + // received an update; add or remove check + case update := <-w.checkUpdateCh: + if update.remove { + delete(watched, update.checkID) + continue + } + + watched[update.checkID] = update.restart + allocID := update.restart.allocID + taskName := update.restart.taskName + checkName := update.restart.checkName + w.logger.Trace("now watching check", "alloc_i", allocID, "task", taskName, "check", checkName) + + // turn on the timer if we are now active + if len(watched) == 1 { + stopCheckTimer() + checkTimer.Reset(w.pollFrequency) + } + + // poll time; refresh check statuses + case now := <-checkTimer.C: + w.interval(ctx, now, watched) + checkTimer.Reset(w.pollFrequency) + } + } +} + +func (w *CheckWatcher) interval(ctx context.Context, now time.Time, watched map[string]*restarter) { + statuses, err := w.getter.Get() + if err != nil && !w.failedPreviousInterval { + w.failedPreviousInterval = true + w.logger.Error("failed to retrieve check statuses", "error", err) + return + } + w.failedPreviousInterval = false + + // keep track of tasks restarted this interval + restarts := set.New[key](len(statuses)) + + // iterate over status of all checks, and update the status of checks + // we care about watching + for checkID, checkRestarter := range watched { + if ctx.Err() != nil { + return // short circuit; caller cancelled us + } + + if restarts.Contains(checkRestarter.taskKey) { + // skip; task is already being restarted + delete(watched, checkID) + continue + } + + status, exists := statuses[checkID] + if !exists { + // warn only if outside grace period; avoiding race with check registration + if now.After(checkRestarter.graceUntil) { + w.logger.Warn("watched check not found", "check_id", checkID) + } + continue + } + + if checkRestarter.apply(ctx, now, status.Status) { + // check will be re-registered & re-watched on startup + delete(watched, checkID) + restarts.Insert(checkRestarter.taskKey) + } + } + + // purge passing checks of tasks that are being restarted + if restarts.Size() > 0 { + for checkID, checkRestarter := range watched { + if restarts.Contains(checkRestarter.taskKey) { + delete(watched, checkID) + } + } + } +} diff --git a/command/agent/consul/check_watcher_test.go b/client/serviceregistration/watcher_test.go similarity index 56% rename from command/agent/consul/check_watcher_test.go rename to client/serviceregistration/watcher_test.go index f27fffe9d..eebaf9f52 100644 --- a/command/agent/consul/check_watcher_test.go +++ b/client/serviceregistration/watcher_test.go @@ -1,4 +1,4 @@ -package consul +package serviceregistration import ( "context" @@ -7,30 +7,29 @@ import ( "testing" "time" - "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) -// checkRestartRecord is used by a testFakeCtx to record when restarts occur +// restartRecord is used by a fakeWorkloadRestarter to record when restarts occur // due to a watched check. -type checkRestartRecord struct { +type restartRecord struct { timestamp time.Time source string reason string failure bool } -// fakeCheckRestarter is a test implementation of TaskRestarter. -type fakeCheckRestarter struct { +// fakeWorkloadRestarter is a test implementation of TaskRestarter. +type fakeWorkloadRestarter struct { // restarts is a slice of all of the restarts triggered by the checkWatcher - restarts []checkRestartRecord + restarts []restartRecord // need the checkWatcher to re-Watch restarted tasks like TaskRunner - watcher *checkWatcher + watcher *CheckWatcher // check to re-Watch on restarts check *structs.ServiceCheck @@ -38,13 +37,12 @@ type fakeCheckRestarter struct { taskName string checkName string - mu sync.Mutex + lock sync.Mutex } -// newFakeCheckRestart creates a new TaskRestarter. It needs all of the -// parameters checkWatcher.Watch expects. -func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, c *structs.ServiceCheck) *fakeCheckRestarter { - return &fakeCheckRestarter{ +// newFakeCheckRestart creates a new mock WorkloadRestarter. +func newFakeWorkloadRestarter(w *CheckWatcher, allocID, taskName, checkName string, c *structs.ServiceCheck) *fakeWorkloadRestarter { + return &fakeWorkloadRestarter{ watcher: w, check: c, allocID: allocID, @@ -53,14 +51,15 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, } } -// Restart implements part of the TaskRestarter interface needed for check -// watching and is normally fulfilled by a TaskRunner. +// Restart implements part of the TaskRestarter interface needed for check watching +// and is normally fulfilled by a TaskRunner. // // Restarts are recorded in the []restarts field and re-Watch the check. -func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { - c.mu.Lock() - defer c.mu.Unlock() - restart := checkRestartRecord{ +func (c *fakeWorkloadRestarter) Restart(_ context.Context, event *structs.TaskEvent, failure bool) error { + c.lock.Lock() + defer c.lock.Unlock() + + restart := restartRecord{ timestamp: time.Now(), source: event.Type, reason: event.DisplayMessage, @@ -73,10 +72,10 @@ func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEve return nil } -// String for debugging -func (c *fakeCheckRestarter) String() string { - c.mu.Lock() - defer c.mu.Unlock() +// String is useful for debugging. +func (c *fakeWorkloadRestarter) String() string { + c.lock.Lock() + defer c.lock.Unlock() s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName) for _, r := range c.restarts { @@ -85,76 +84,55 @@ func (c *fakeCheckRestarter) String() string { return s } -// GetRestarts for testing in a threadsafe way -func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord { - c.mu.Lock() - defer c.mu.Unlock() +// GetRestarts for testing in a thread-safe way +func (c *fakeWorkloadRestarter) GetRestarts() []restartRecord { + c.lock.Lock() + defer c.lock.Unlock() - o := make([]checkRestartRecord, len(c.restarts)) + o := make([]restartRecord, len(c.restarts)) copy(o, c.restarts) return o } -// checkResponse is a response returned by the fakeChecksAPI after the given -// time. -type checkResponse struct { +// response is a response returned by fakeCheckStatusGetter after a certain time +type response struct { at time.Time id string status string } -// fakeChecksAPI implements the Checks() method for testing Consul. -type fakeChecksAPI struct { - // responses is a map of check ids to their status at a particular - // time. checkResponses must be in chronological order. - responses map[string][]checkResponse - - mu sync.Mutex +// fakeCheckStatusGetter is a mock implementation of CheckStatusGetter +type fakeCheckStatusGetter struct { + lock sync.Mutex + responses map[string][]response } -func newFakeChecksAPI() *fakeChecksAPI { - return &fakeChecksAPI{responses: make(map[string][]checkResponse)} -} +func (g *fakeCheckStatusGetter) Get() (map[string]CheckStatus, error) { + g.lock.Lock() + defer g.lock.Unlock() -// add a new check status to Consul at the given time. -func (c *fakeChecksAPI) add(id, status string, at time.Time) { - c.mu.Lock() - c.responses[id] = append(c.responses[id], checkResponse{at, id, status}) - c.mu.Unlock() -} - -func (c *fakeChecksAPI) ChecksWithFilterOpts(filter string, opts *api.QueryOptions) (map[string]*api.AgentCheck, error) { - c.mu.Lock() - defer c.mu.Unlock() now := time.Now() - result := make(map[string]*api.AgentCheck, len(c.responses)) - - // Use the latest response for each check - for k, vs := range c.responses { + result := make(map[string]CheckStatus) + // use the newest response after now for the response + for k, vs := range g.responses { for _, v := range vs { if v.at.After(now) { break } - result[k] = &api.AgentCheck{ - CheckID: k, - Name: k, - Status: v.status, - } + result[k] = CheckStatus{Status: v.status} } } return result, nil } -// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test -// logger and faster poll frequency. -func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) { - logger := testlog.HCLogger(t) - checksAPI := newFakeChecksAPI() - namespacesClient := NewNamespacesClient(NewMockNamespaces(nil), NewMockAgent(ossFeatures)) - cw := newCheckWatcher(logger, checksAPI, namespacesClient) - cw.pollFreq = 10 * time.Millisecond - return checksAPI, cw +func (g *fakeCheckStatusGetter) add(checkID, status string, at time.Time) { + g.lock.Lock() + defer g.lock.Unlock() + if g.responses == nil { + g.responses = make(map[string][]response) + } + g.responses[checkID] = append(g.responses[checkID], response{at, checkID, status}) } func testCheck() *structs.ServiceCheck { @@ -170,8 +148,22 @@ func testCheck() *structs.ServiceCheck { } } -// TestCheckWatcher_Skip asserts unwatched checks are ignored. -func TestCheckWatcher_Skip(t *testing.T) { +// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test +// logger and faster poll frequency. +func testWatcherSetup(t *testing.T) (*fakeCheckStatusGetter, *CheckWatcher) { + logger := testlog.HCLogger(t) + getter := new(fakeCheckStatusGetter) + cw := NewCheckWatcher(logger, getter) + cw.pollFrequency = 10 * time.Millisecond + return getter, cw +} + +func before() time.Time { + return time.Now().Add(-10 * time.Second) +} + +// TestCheckWatcher_SkipUnwatched asserts unwatched checks are ignored. +func TestCheckWatcher_SkipUnwatched(t *testing.T) { ci.Parallel(t) // Create a check with restarting disabled @@ -179,65 +171,61 @@ func TestCheckWatcher_Skip(t *testing.T) { check.CheckRestart = nil logger := testlog.HCLogger(t) - checksAPI := newFakeChecksAPI() - namespacesClient := NewNamespacesClient(NewMockNamespaces(nil), NewMockAgent(ossFeatures)) + getter := new(fakeCheckStatusGetter) - cw := newCheckWatcher(logger, checksAPI, namespacesClient) - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check) + cw := NewCheckWatcher(logger, getter) + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check) cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1) // Check should have been dropped as it's not watched - if n := len(cw.checkUpdateCh); n != 0 { - t.Fatalf("expected 0 checks to be enqueued for watching but found %d", n) - } + enqueued := len(cw.checkUpdateCh) + must.Zero(t, enqueued, must.Sprintf("expected 0 checks to be enqueued for watching but found %d", enqueued)) } // TestCheckWatcher_Healthy asserts healthy tasks are not restarted. func TestCheckWatcher_Healthy(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) + now := before() + getter, cw := testWatcherSetup(t) + + // Make both checks healthy from the beginning + getter.add("testcheck1", "passing", now) + getter.add("testcheck2", "passing", now) check1 := testCheck() - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) check2 := testCheck() check2.CheckRestart.Limit = 1 check2.CheckRestart.Grace = 0 - restarter2 := newFakeCheckRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2) + restarter2 := newFakeWorkloadRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2) cw.Watch("testalloc2", "testtask2", "testcheck2", check2, restarter2) - // Make both checks healthy from the beginning - fakeAPI.add("testcheck1", "passing", time.Time{}) - fakeAPI.add("testcheck2", "passing", time.Time{}) - // Run ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() cw.Run(ctx) // Ensure restart was never called - if n := len(restarter1.restarts); n > 0 { - t.Errorf("expected check 1 to not be restarted but found %d:\n%s", n, restarter1) - } - if n := len(restarter2.restarts); n > 0 { - t.Errorf("expected check 2 to not be restarted but found %d:\n%s", n, restarter2) - } + must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted")) + must.Empty(t, restarter2.restarts, must.Sprint("expected check 2 to not be restarted")) } // TestCheckWatcher_Unhealthy asserts unhealthy tasks are restarted exactly once. func TestCheckWatcher_Unhealthy(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) - - check1 := testCheck() - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) - cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) + now := before() + getter, cw := testWatcherSetup(t) // Check has always been failing - fakeAPI.add("testcheck1", "critical", time.Time{}) + getter.add("testcheck1", "critical", now) + + check1 := testCheck() + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) // Run ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -245,7 +233,7 @@ func TestCheckWatcher_Unhealthy(t *testing.T) { cw.Run(ctx) // Ensure restart was called exactly once - require.Len(t, restarter1.restarts, 1) + must.Len(t, 1, restarter1.restarts, must.Sprint("expected check to be restarted once")) } // TestCheckWatcher_HealthyWarning asserts checks in warning with @@ -253,27 +241,26 @@ func TestCheckWatcher_Unhealthy(t *testing.T) { func TestCheckWatcher_HealthyWarning(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) + now := before() + getter, cw := testWatcherSetup(t) + + // Check is always in warning but that's ok + getter.add("testcheck1", "warning", now) check1 := testCheck() check1.CheckRestart.Limit = 1 check1.CheckRestart.Grace = 0 check1.CheckRestart.IgnoreWarnings = true - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) - // Check is always in warning but that's ok - fakeAPI.add("testcheck1", "warning", time.Time{}) - // Run ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() cw.Run(ctx) // Ensure restart was never called on check 1 - if n := len(restarter1.restarts); n > 0 { - t.Errorf("expected check 1 to not be restarted but found %d", n) - } + must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted")) } // TestCheckWatcher_Flapping asserts checks that flap from healthy to unhealthy @@ -281,56 +268,53 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) { func TestCheckWatcher_Flapping(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) + getter, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Grace = 0 - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) // Check flaps and is never failing for the full 200ms needed to restart now := time.Now() - fakeAPI.add("testcheck1", "passing", now) - fakeAPI.add("testcheck1", "critical", now.Add(100*time.Millisecond)) - fakeAPI.add("testcheck1", "passing", now.Add(250*time.Millisecond)) - fakeAPI.add("testcheck1", "critical", now.Add(300*time.Millisecond)) - fakeAPI.add("testcheck1", "passing", now.Add(450*time.Millisecond)) + getter.add("testcheck1", "passing", now) + getter.add("testcheck1", "critical", now.Add(100*time.Millisecond)) + getter.add("testcheck1", "passing", now.Add(250*time.Millisecond)) + getter.add("testcheck1", "critical", now.Add(300*time.Millisecond)) + getter.add("testcheck1", "passing", now.Add(450*time.Millisecond)) ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond) defer cancel() cw.Run(ctx) // Ensure restart was never called on check 1 - if n := len(restarter1.restarts); n > 0 { - t.Errorf("expected check 1 to not be restarted but found %d\n%s", n, restarter1) - } + must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted")) } // TestCheckWatcher_Unwatch asserts unwatching checks prevents restarts. func TestCheckWatcher_Unwatch(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) + now := before() + getter, cw := testWatcherSetup(t) + + // Always failing + getter.add("testcheck1", "critical", now) // Unwatch immediately check1 := testCheck() check1.CheckRestart.Limit = 1 check1.CheckRestart.Grace = 100 * time.Millisecond - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) cw.Unwatch("testcheck1") - // Always failing - fakeAPI.add("testcheck1", "critical", time.Time{}) - ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) defer cancel() cw.Run(ctx) // Ensure restart was never called on check 1 - if n := len(restarter1.restarts); n > 0 { - t.Errorf("expected check 1 to not be restarted but found %d\n%s", n, restarter1) - } + must.Empty(t, restarter1.restarts, must.Sprint("expected check 1 to not be restarted")) } // TestCheckWatcher_MultipleChecks asserts that when there are multiple checks @@ -339,31 +323,34 @@ func TestCheckWatcher_Unwatch(t *testing.T) { func TestCheckWatcher_MultipleChecks(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) + getter, cw := testWatcherSetup(t) + + // check is critical, 3 passing; should only be 1 net restart + now := time.Now() + getter.add("testcheck1", "critical", before()) + getter.add("testcheck1", "passing", now.Add(150*time.Millisecond)) + getter.add("testcheck2", "critical", before()) + getter.add("testcheck2", "passing", now.Add(150*time.Millisecond)) + getter.add("testcheck3", "passing", time.Time{}) check1 := testCheck() + check1.Name = "testcheck1" check1.CheckRestart.Limit = 1 - restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + restarter1 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) check2 := testCheck() + check2.Name = "testcheck2" check2.CheckRestart.Limit = 1 - restarter2 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck2", check2) + restarter2 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck2", check2) cw.Watch("testalloc1", "testtask1", "testcheck2", check2, restarter2) check3 := testCheck() + check3.Name = "testcheck3" check3.CheckRestart.Limit = 1 - restarter3 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck3", check3) + restarter3 := newFakeWorkloadRestarter(cw, "testalloc1", "testtask1", "testcheck3", check3) cw.Watch("testalloc1", "testtask1", "testcheck3", check3, restarter3) - // check 2 & 3 fail long enough to cause 1 restart, but only 1 should restart - now := time.Now() - fakeAPI.add("testcheck1", "critical", now) - fakeAPI.add("testcheck1", "passing", now.Add(150*time.Millisecond)) - fakeAPI.add("testcheck2", "critical", now) - fakeAPI.add("testcheck2", "passing", now.Add(150*time.Millisecond)) - fakeAPI.add("testcheck3", "passing", time.Time{}) - // Run ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -388,17 +375,17 @@ func TestCheckWatcher_MultipleChecks(t *testing.T) { func TestCheckWatcher_Deadlock(t *testing.T) { ci.Parallel(t) - fakeAPI, cw := testWatcherSetup(t) + getter, cw := testWatcherSetup(t) // If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes // a deadlock due to checkWatcher.Run being blocked in // checkRestart.apply and unable to process updates from the chan! n := cap(cw.checkUpdateCh) + 1 checks := make([]*structs.ServiceCheck, n) - restarters := make([]*fakeCheckRestarter, n) + restarters := make([]*fakeWorkloadRestarter, n) for i := 0; i < n; i++ { c := testCheck() - r := newFakeCheckRestarter(cw, + r := newFakeWorkloadRestarter(cw, fmt.Sprintf("alloc%d", i), fmt.Sprintf("task%d", i), fmt.Sprintf("check%d", i), @@ -420,7 +407,7 @@ func TestCheckWatcher_Deadlock(t *testing.T) { // Make them all fail for _, r := range restarters { - fakeAPI.add(r.checkName, "critical", time.Time{}) + getter.add(r.checkName, "critical", time.Time{}) } // Ensure that restart was called exactly once on all checks @@ -432,6 +419,6 @@ func TestCheckWatcher_Deadlock(t *testing.T) { } return true, nil }, func(err error) { - require.NoError(t, err) + must.NoError(t, err) }) } diff --git a/command/agent/consul/check_watcher.go b/command/agent/consul/check_watcher.go deleted file mode 100644 index 533a98a10..000000000 --- a/command/agent/consul/check_watcher.go +++ /dev/null @@ -1,355 +0,0 @@ -package consul - -import ( - "context" - "fmt" - "time" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/serviceregistration" - "github.com/hashicorp/nomad/nomad/structs" -) - -const ( - // defaultPollFreq is the default rate to poll the Consul Checks API - defaultPollFreq = 900 * time.Millisecond -) - -// ChecksAPI is the part of the Consul API the checkWatcher requires. -type ChecksAPI interface { - ChecksWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentCheck, error) -} - -// checkRestart handles restarting a task if a check is unhealthy. -type checkRestart struct { - allocID string - taskName string - checkID string - checkName string - taskKey string // composite of allocID + taskName for uniqueness - - task serviceregistration.WorkloadRestarter - grace time.Duration - interval time.Duration - timeLimit time.Duration - ignoreWarnings bool - - // Mutable fields - - // unhealthyState is the time a check first went unhealthy. Set to the - // zero value if the check passes before timeLimit. - unhealthyState time.Time - - // graceUntil is when the check's grace period expires and unhealthy - // checks should be counted. - graceUntil time.Time - - logger hclog.Logger -} - -// apply restart state for check and restart task if necessary. Current -// timestamp is passed in so all check updates have the same view of time (and -// to ease testing). -// -// Returns true if a restart was triggered in which case this check should be -// removed (checks are added on task startup). -func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) bool { - healthy := func() { - if !c.unhealthyState.IsZero() { - c.logger.Debug("canceling restart because check became healthy") - c.unhealthyState = time.Time{} - } - } - switch status { - case api.HealthCritical: - case api.HealthWarning: - if c.ignoreWarnings { - // Warnings are ignored, reset state and exit - healthy() - return false - } - default: - // All other statuses are ok, reset state and exit - healthy() - return false - } - - if now.Before(c.graceUntil) { - // In grace period, exit - return false - } - - if c.unhealthyState.IsZero() { - // First failure, set restart deadline - if c.timeLimit != 0 { - c.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", c.timeLimit) - } - c.unhealthyState = now - } - - // restart timeLimit after start of this check becoming unhealthy - restartAt := c.unhealthyState.Add(c.timeLimit) - - // Must test >= because if limit=1, restartAt == first failure - if now.Equal(restartAt) || now.After(restartAt) { - // hasn't become healthy by deadline, restart! - c.logger.Debug("restarting due to unhealthy check") - - // Tell TaskRunner to restart due to failure - reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName) - event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason) - go asyncRestart(ctx, c.logger, c.task, event) - return true - } - - return false -} - -// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended -// to be called in a goroutine. -func asyncRestart(ctx context.Context, logger hclog.Logger, task serviceregistration.WorkloadRestarter, event *structs.TaskEvent) { - // Check watcher restarts are always failures - const failure = true - - // Restarting is asynchronous so there's no reason to allow this - // goroutine to block indefinitely. - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - if err := task.Restart(ctx, event, failure); err != nil { - // Restart errors are not actionable and only relevant when - // debugging allocation lifecycle management. - logger.Debug("failed to restart task", "error", err, - "event_time", event.Time, "event_type", event.Type) - } -} - -// checkWatchUpdates add or remove checks from the watcher -type checkWatchUpdate struct { - checkID string - remove bool - checkRestart *checkRestart -} - -// checkWatcher watches Consul checks and restarts tasks when they're -// unhealthy. -type checkWatcher struct { - namespacesClient *NamespacesClient - checksAPI ChecksAPI - - // pollFreq is how often to poll the checks API and defaults to - // defaultPollFreq - pollFreq time.Duration - - // checkUpdateCh is how watches (and removals) are sent to the main - // watching loop - checkUpdateCh chan checkWatchUpdate - - // done is closed when Run has exited - done chan struct{} - - // lastErr is true if the last Consul call failed. It is used to - // squelch repeated error messages. - lastErr bool - - logger hclog.Logger -} - -// newCheckWatcher creates a new checkWatcher but does not call its Run method. -func newCheckWatcher(logger hclog.Logger, checksAPI ChecksAPI, namespacesClient *NamespacesClient) *checkWatcher { - return &checkWatcher{ - namespacesClient: namespacesClient, - checksAPI: checksAPI, - pollFreq: defaultPollFreq, - checkUpdateCh: make(chan checkWatchUpdate, 8), - done: make(chan struct{}), - logger: logger.ResetNamed("consul.health"), - } -} - -// Run the main Consul checks watching loop to restart tasks when their checks -// fail. Blocks until context is canceled. -func (w *checkWatcher) Run(ctx context.Context) { - defer close(w.done) - - // map of check IDs to their metadata - checks := map[string]*checkRestart{} - - // timer for check polling - checkTimer := time.NewTimer(0) - defer checkTimer.Stop() // ensure timer is never leaked - - stopTimer := func() { - checkTimer.Stop() - select { - case <-checkTimer.C: - default: - } - } - - // disable by default - stopTimer() - - // Main watch loop -WATCHER: - for { - // disable polling if there are no checks - if len(checks) == 0 { - stopTimer() - } - - select { - case update := <-w.checkUpdateCh: - if update.remove { - // Remove a check - delete(checks, update.checkID) - continue - } - - // Add/update a check - checks[update.checkID] = update.checkRestart - w.logger.Debug("watching check", "alloc_id", update.checkRestart.allocID, - "task", update.checkRestart.taskName, "check", update.checkRestart.checkName) - - // if first check was added make sure polling is enabled - if len(checks) == 1 { - stopTimer() - checkTimer.Reset(w.pollFreq) - } - - case <-ctx.Done(): - return - - case <-checkTimer.C: - checkTimer.Reset(w.pollFreq) - - // Set "now" as the point in time the following check results represent - now := time.Now() - - // Get the list of all namespaces so we can iterate them. - namespaces, err := w.namespacesClient.List() - if err != nil { - if !w.lastErr { - w.lastErr = true - w.logger.Error("failed retrieving namespaces", "error", err) - } - continue WATCHER - } - - checkResults := make(map[string]*api.AgentCheck) - for _, namespace := range namespaces { - nsResults, err := w.checksAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)}) - if err != nil { - if !w.lastErr { - w.lastErr = true - w.logger.Error("failed retrieving health checks", "error", err) - } - continue WATCHER - } else { - for k, v := range nsResults { - checkResults[k] = v - } - } - } - - w.lastErr = false - - // Keep track of tasks restarted this period so they - // are only restarted once and all of their checks are - // removed. - restartedTasks := map[string]struct{}{} - - // Loop over watched checks and update their status from results - for cid, check := range checks { - // Shortcircuit if told to exit - if ctx.Err() != nil { - return - } - - if _, ok := restartedTasks[check.taskKey]; ok { - // Check for this task already restarted; remove and skip check - delete(checks, cid) - continue - } - - result, ok := checkResults[cid] - if !ok { - // Only warn if outside grace period to avoid races with check registration - if now.After(check.graceUntil) { - w.logger.Warn("watched check not found in Consul", "check", check.checkName, "check_id", cid) - } - continue - } - - restarted := check.apply(ctx, now, result.Status) - if restarted { - // Checks are registered+watched on - // startup, so it's safe to remove them - // whenever they're restarted - delete(checks, cid) - - restartedTasks[check.taskKey] = struct{}{} - } - } - - // Ensure even passing checks for restartedTasks are removed - if len(restartedTasks) > 0 { - for cid, check := range checks { - if _, ok := restartedTasks[check.taskKey]; ok { - delete(checks, cid) - } - } - } - } - } -} - -// Watch a check and restart its task if unhealthy. -func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter serviceregistration.WorkloadRestarter) { - if !check.TriggersRestarts() { - // Not watched, noop - return - } - - c := &checkRestart{ - allocID: allocID, - taskName: taskName, - checkID: checkID, - checkName: check.Name, - taskKey: fmt.Sprintf("%s%s", allocID, taskName), // unique task ID - task: restarter, - interval: check.Interval, - grace: check.CheckRestart.Grace, - graceUntil: time.Now().Add(check.CheckRestart.Grace), - timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1), - ignoreWarnings: check.CheckRestart.IgnoreWarnings, - logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name), - } - - update := checkWatchUpdate{ - checkID: checkID, - checkRestart: c, - } - - select { - case w.checkUpdateCh <- update: - // sent watch - case <-w.done: - // exited; nothing to do - } -} - -// Unwatch a check. -func (w *checkWatcher) Unwatch(cid string) { - c := checkWatchUpdate{ - checkID: cid, - remove: true, - } - select { - case w.checkUpdateCh <- c: - // sent remove watch - case <-w.done: - // exited; nothing to do - } -} diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index 77b71f9ab..7e720f195 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -419,13 +419,43 @@ type ServiceClient struct { deregisterProbationExpiry time.Time // checkWatcher restarts checks that are unhealthy. - checkWatcher *checkWatcher + // checkWatcher *checkWatcher + checkWatcher *serviceregistration.CheckWatcher // isClientAgent specifies whether this Consul client is being used // by a Nomad client. isClientAgent bool } +// checkStatusGetter is the consul-specific implementation of serviceregistration.CheckStatusGetter +type checkStatusGetter struct { + agentAPI AgentAPI + namespacesClient *NamespacesClient +} + +func (csg *checkStatusGetter) Get() (map[string]serviceregistration.CheckStatus, error) { + // Get the list of all namespaces so we can iterate them. + namespaces, err := csg.namespacesClient.List() + if err != nil { + return nil, err + } + + results := make(map[string]serviceregistration.CheckStatus) + for _, namespace := range namespaces { + resultsInNamespace, err := csg.agentAPI.ChecksWithFilterOpts("", &api.QueryOptions{Namespace: normalizeNamespace(namespace)}) + if err != nil { + return nil, err + } + + for k, v := range resultsInNamespace { + results[k] = serviceregistration.CheckStatus{ + Status: v.Status, + } + } + } + return results, nil +} + // NewServiceClient creates a new Consul ServiceClient from an existing Consul API // Client, logger and takes whether the client is being used by a Nomad Client agent. // When being used by a Nomad client, this Consul client reconciles all services and @@ -450,9 +480,12 @@ func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, log allocRegistrations: make(map[string]*serviceregistration.AllocRegistration), agentServices: make(map[string]struct{}), agentChecks: make(map[string]struct{}), - checkWatcher: newCheckWatcher(logger, agentAPI, namespacesClient), isClientAgent: isNomadClient, deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod), + checkWatcher: serviceregistration.NewCheckWatcher(logger, &checkStatusGetter{ + agentAPI: agentAPI, + namespacesClient: namespacesClient, + }), } } diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index f7328bfee..8f1c00e9b 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "reflect" "strings" "sync/atomic" "testing" @@ -329,380 +328,6 @@ func TestConsul_ChangePorts(t *testing.T) { } } -// TestConsul_ChangeChecks asserts that updating only the checks on a service -// properly syncs with Consul. -func TestConsul_ChangeChecks(t *testing.T) { - ci.Parallel(t) - - ctx := setupFake(t) - ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "c1", - Type: "tcp", - Interval: time.Second, - Timeout: time.Second, - PortLabel: "x", - CheckRestart: &structs.CheckRestart{ - Limit: 3, - }, - }, - } - - if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(syncNewOps); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services["default"]); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"]) - } - - // Assert a check restart watch update was enqueued and clear it - if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 { - t.Fatalf("expected 1 check restart update but found %d", n) - } - upd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh - c1ID := upd.checkID - - // Query the allocs registrations and then again when we update. The IDs - // should change - reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg1 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg1.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg1.NumChecks(); num != 1 { - t.Fatalf("Wrong number of checks: got %d; want 1", num) - } - - origServiceKey := "" - for k, v := range ctx.FakeConsul.services["default"] { - origServiceKey = k - if v.Name != ctx.Workload.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name) - } - if v.Port != xPort { - t.Errorf("expected Port x=%v but found: %v", xPort, v.Port) - } - } - - if n := len(ctx.FakeConsul.checks["default"]); n != 1 { - t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"]) - } - for _, v := range ctx.FakeConsul.checks["default"] { - if v.Name != "c1" { - t.Fatalf("expected check c1 but found %q", v.Name) - } - } - - // Now add a check and modify the original - origWorkload := ctx.Workload.Copy() - ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "c1", - Type: "tcp", - Interval: 2 * time.Second, - Timeout: time.Second, - PortLabel: "x", - CheckRestart: &structs.CheckRestart{ - Limit: 3, - }, - }, - { - Name: "c2", - Type: "http", - Path: "/", - Interval: time.Second, - Timeout: time.Second, - PortLabel: "x", - }, - } - if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - // Assert 2 check restart watch updates was enqueued - if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 2 { - t.Fatalf("expected 2 check restart updates but found %d", n) - } - - // First the new watch - upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if upd.checkID == c1ID || upd.remove { - t.Fatalf("expected check watch update to be an add of checkID=%q but found remove=%t checkID=%q", - c1ID, upd.remove, upd.checkID) - } - - // Then remove the old watch - upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if upd.checkID != c1ID || !upd.remove { - t.Fatalf("expected check watch update to be a removal of checkID=%q but found remove=%t checkID=%q", - c1ID, upd.remove, upd.checkID) - } - - if err := ctx.syncOnce(syncNewOps); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services["default"]); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"]) - } - - if _, ok := ctx.FakeConsul.services["default"][origServiceKey]; !ok { - t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services) - } - - if n := len(ctx.FakeConsul.checks["default"]); n != 2 { - t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"]) - } - - for k, v := range ctx.FakeConsul.checks["default"] { - switch v.Name { - case "c1": - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } - - // update id - c1ID = k - case "c2": - if expected := fmt.Sprintf("http://:%d/", xPort); v.HTTP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.HTTP) - } - default: - t.Errorf("Unknown check: %q", k) - } - } - - // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg2 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg2.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg2.NumChecks(); num != 2 { - t.Fatalf("Wrong number of checks: got %d; want 2", num) - } - - for task, treg := range reg1.Tasks { - otherTaskReg, ok := reg2.Tasks[task] - if !ok { - t.Fatalf("Task %q not in second reg", task) - } - - for sID, sreg := range treg.Services { - otherServiceReg, ok := otherTaskReg.Services[sID] - if !ok { - t.Fatalf("service ID changed") - } - - for newID := range sreg.CheckIDs { - if _, ok := otherServiceReg.CheckIDs[newID]; ok { - t.Fatalf("check IDs should change") - } - } - } - } - - // Alter a CheckRestart and make sure the watcher is updated but nothing else - origWorkload = ctx.Workload.Copy() - ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "c1", - Type: "tcp", - Interval: 2 * time.Second, - Timeout: time.Second, - PortLabel: "x", - CheckRestart: &structs.CheckRestart{ - Limit: 11, - }, - }, - { - Name: "c2", - Type: "http", - Path: "/", - Interval: time.Second, - Timeout: time.Second, - PortLabel: "x", - }, - } - if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(syncNewOps); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.checks["default"]); n != 2 { - t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks["default"]) - } - - for k, v := range ctx.FakeConsul.checks["default"] { - if v.Name == "c1" { - if k != c1ID { - t.Errorf("expected c1 to still have id %q but found %q", c1ID, k) - } - break - } - } - - // Assert a check restart watch update was enqueued for a removal and an add - if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 { - t.Fatalf("expected 1 check restart update but found %d", n) - } - <-ctx.ServiceClient.checkWatcher.checkUpdateCh -} - -// TestConsul_RegServices tests basic service registration. -func TestConsul_RegServices(t *testing.T) { - ci.Parallel(t) - - ctx := setupFake(t) - - // Add a check w/restarting - ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "testcheck", - Type: "tcp", - Interval: 100 * time.Millisecond, - CheckRestart: &structs.CheckRestart{ - Limit: 3, - }, - }, - } - - if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(syncNewOps); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services["default"]); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"]) - } - - for _, v := range ctx.FakeConsul.services["default"] { - if v.Name != ctx.Workload.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags) - } - if v.Port != xPort { - t.Errorf("expected Port=%d != %d", xPort, v.Port) - } - } - - // Assert the check update is pending - if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 { - t.Fatalf("expected 1 check restart update but found %d", n) - } - - // Assert the check update is properly formed - checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if checkUpd.checkRestart.allocID != ctx.Workload.AllocID { - t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID) - } - if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected { - t.Fatalf("expected check's time limit to be %v but found %v", expected, checkUpd.checkRestart.timeLimit) - } - - // Make a change which will register a new service - ctx.Workload.Services[0].Name = "taskname-service2" - ctx.Workload.Services[0].Tags[0] = "tag3" - if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - // Assert check update is pending - if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 { - t.Fatalf("expected 1 check restart update but found %d", n) - } - - // Assert the check update's id has changed - checkUpd2 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if checkUpd.checkID == checkUpd2.checkID { - t.Fatalf("expected new check update to have a new ID both both have: %q", checkUpd.checkID) - } - - // Make sure changes don't take affect until sync() is called (since - // Run() isn't running) - if n := len(ctx.FakeConsul.services["default"]); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"]) - } - for _, v := range ctx.FakeConsul.services["default"] { - if reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) { - t.Errorf("expected Tags to differ, changes applied before sync()") - } - } - - // Now sync() and re-check for the applied updates - if err := ctx.syncOnce(syncNewOps); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - if n := len(ctx.FakeConsul.services["default"]); n != 2 { - t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services["default"]) - } - found := false - for _, v := range ctx.FakeConsul.services["default"] { - if v.Name == ctx.Workload.Services[0].Name { - if found { - t.Fatalf("found new service name %q twice", v.Name) - } - found = true - if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags) - } - } - } - if !found { - t.Fatalf("did not find new service %q", ctx.Workload.Services[0].Name) - } - - // Remove the new task - ctx.ServiceClient.RemoveWorkload(ctx.Workload) - if err := ctx.syncOnce(syncNewOps); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - if n := len(ctx.FakeConsul.services["default"]); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services["default"]) - } - for _, v := range ctx.FakeConsul.services["default"] { - if v.Name != "taskname-service" { - t.Errorf("expected original task to survive not %q", v.Name) - } - } - - // Assert check update is pending - if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 { - t.Fatalf("expected 1 check restart update but found %d", n) - } - - // Assert the check update's id is correct and that it's a removal - checkUpd3 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if checkUpd2.checkID != checkUpd3.checkID { - t.Fatalf("expected checkid %q but found %q", checkUpd2.checkID, checkUpd3.checkID) - } - if !checkUpd3.remove { - t.Fatalf("expected check watch removal update but found: %#v", checkUpd3) - } -} - // TestConsul_ShutdownOK tests the ok path for the shutdown logic in // ServiceClient. func TestConsul_ShutdownOK(t *testing.T) { diff --git a/nomad/structs/services.go b/nomad/structs/services.go index 284c88319..04a03b088 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -47,7 +47,10 @@ const ( minCheckTimeout = 1 * time.Second ) -// ServiceCheck represents the Consul health check. +// ServiceCheck represents a Nomad or Consul service health check. +// +// The fields available depend on the service provider the check is being +// registered into. type ServiceCheck struct { Name string // Name of the check, defaults to a generated label Type string // Type of the check - tcp, http, docker and script