Merge pull request #9419 from hashicorp/api-event-stream-index

events: API event stream client should the index flag
This commit is contained in:
Chris Baker
2020-11-23 07:58:16 -06:00
committed by GitHub
4 changed files with 83 additions and 1 deletions

View File

@@ -44,8 +44,9 @@ __BACKWARDS INCOMPATIBILITIES:__
BUG FIXES:
* agent (Enterprise): Fixed a bug where audit logging caused websocket and streaming http endpoints to fail [[GH-9319](https://github.com/hashicorp/nomad/issues/9319)]
* api: Fixed a bug where AllocatedResources contained increasingly duplicated ports [[GH-9368](https://github.com/hashicorp/nomad/issues/9368)]
* api: Fixed a bug where the event stream client didn't pass the index query parameters [[GH-9419](https://github.com/hashicorp/nomad/issues/9419)]
* core: Fixed a bug where ACL handling prevented cross-namespace allocation listing [[GH-9278](https://github.com/hashicorp/nomad/issues/9278)]
* core: Fixed a bug where AllocatedResources contained increasingly duplicated ports [[GH-9368](https://github.com/hashicorp/nomad/issues/9368)]
* core: Fixed a bug where scaling policy filtering would ignore type query if job query was present [[GH-9312](https://github.com/hashicorp/nomad/issues/9312)]
* core: Fixed a bug where a request to scale a job would fail if the job was not in the default namespace. [[GH-9296](https://github.com/hashicorp/nomad/pull/9296)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/mitchellh/mapstructure"
@@ -141,6 +142,10 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
return nil, err
}
q = q.WithContext(ctx)
if q.Params == nil {
q.Params = map[string]string{}
}
q.Params["index"] = strconv.FormatUint(index, 10)
r.setQueryOptions(q)
// Build topic query params

View File

@@ -182,3 +182,74 @@ func (tc *EventsTest) TestBlockedEvalEvents(f *framework.F) {
require.NoError(t, e)
})
}
// TestStartIndex applies a job, then connects to the stream with a start
// index to verify that the events from before the job are not included.
func (tc *EventsTest) TestStartIndex(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
events := nomadClient.EventStream()
uuid := uuid.Generate()
jobID := fmt.Sprintf("deployment-%s", uuid[0:8])
jobID2 := fmt.Sprintf("deployment2-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID, jobID2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// register job
err := e2eutil.Register(jobID, "events/input/initial.nomad")
require.NoError(t, err)
job, _, err := nomadClient.Jobs().Info(jobID, nil)
require.NoError(t, err)
startIndex := *job.JobModifyIndex + 1
topics := map[api.Topic][]string{
"Job": {"*"},
}
// starting at Job.ModifyIndex + 1, the next (and only) JobRegistered event that we see
// should be from a different job registration
streamCh, err := events.Stream(ctx, topics, startIndex, nil)
require.NoError(t, err)
var jobEvents []api.Event
// gather job register events
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-streamCh:
if event.IsHeartbeat() {
continue
}
jobEvents = append(jobEvents, event.Events...)
}
}
}()
// new job (to make sure we get a JobRegistered event)
err = e2eutil.Register(jobID2, "events/input/deploy.nomad")
require.NoError(t, err)
// ensure there is a deployment promotion event
foundUnexpected := false
testutil.WaitForResult(func() (bool, error) {
for _, e := range jobEvents {
if e.Type == "JobRegistered" {
if e.Index <= startIndex {
foundUnexpected = true
}
if e.Index >= startIndex {
return true, nil
}
}
}
return false, fmt.Errorf("expected to receive JobRegistered event for index at least %v", startIndex)
}, func(e error) {
f.NoError(e)
})
require.False(t, foundUnexpected, "found events from earlier-than-expected indices")
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/mitchellh/mapstructure"
@@ -141,6 +142,10 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
return nil, err
}
q = q.WithContext(ctx)
if q.Params == nil {
q.Params = map[string]string{}
}
q.Params["index"] = strconv.FormatUint(index, 10)
r.setQueryOptions(q)
// Build topic query params