diff --git a/e2e/events/events.go b/e2e/events/events.go index 7f8015e1d..4840b070d 100644 --- a/e2e/events/events.go +++ b/e2e/events/events.go @@ -182,3 +182,74 @@ func (tc *EventsTest) TestBlockedEvalEvents(f *framework.F) { require.NoError(t, e) }) } + +// TestStartIndex applies a job, then connects to the stream with a start +// index to verify that the events from before the job are not included. +func (tc *EventsTest) TestStartIndex(f *framework.F) { + t := f.T() + + nomadClient := tc.Nomad() + events := nomadClient.EventStream() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("deployment-%s", uuid[0:8]) + jobID2 := fmt.Sprintf("deployment2-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID, jobID2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // register job + err := e2eutil.Register(jobID, "events/input/initial.nomad") + require.NoError(t, err) + job, _, err := nomadClient.Jobs().Info(jobID, nil) + require.NoError(t, err) + startIndex := *job.JobModifyIndex + 1 + + topics := map[api.Topic][]string{ + "Job": {"*"}, + } + + // starting at Job.ModifyIndex + 1, the next (and only) JobRegistered event that we see + // should be from a different job registration + streamCh, err := events.Stream(ctx, topics, startIndex, nil) + require.NoError(t, err) + + var jobEvents []api.Event + // gather job register events + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <-streamCh: + if event.IsHeartbeat() { + continue + } + jobEvents = append(jobEvents, event.Events...) + } + } + }() + + // new job (to make sure we get a JobRegistered event) + err = e2eutil.Register(jobID2, "events/input/deploy.nomad") + require.NoError(t, err) + + // ensure there is a deployment promotion event + foundUnexpected := false + testutil.WaitForResult(func() (bool, error) { + for _, e := range jobEvents { + if e.Type == "JobRegistered" { + if e.Index <= startIndex { + foundUnexpected = true + } + if e.Index >= startIndex { + return true, nil + } + } + } + return false, fmt.Errorf("expected to receive JobRegistered event for index at least %v", startIndex) + }, func(e error) { + f.NoError(e) + }) + require.False(t, foundUnexpected, "found events from earlier-than-expected indices") +}