use Events to wrap index and events, store in events table

This commit is contained in:
Drew Bailey
2020-10-04 15:12:35 -04:00
parent 47d1a33eb5
commit 004f634868
40 changed files with 949 additions and 266 deletions

View File

@@ -13,7 +13,6 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -134,12 +133,12 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i
return nil, codedErr
}
func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) {
func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) {
raw, ok := query["topic"]
if !ok {
return allTopics(), nil
}
topics := make(map[stream.Topic][]string)
topics := make(map[structs.Topic][]string)
for _, topic := range raw {
k, v, err := parseTopic(topic)
@@ -147,10 +146,10 @@ func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) {
return nil, fmt.Errorf("error parsing topics: %w", err)
}
if topics[stream.Topic(k)] == nil {
topics[stream.Topic(k)] = []string{v}
if topics[structs.Topic(k)] == nil {
topics[structs.Topic(k)] = []string{v}
} else {
topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v)
topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v)
}
}
return topics, nil
@@ -164,6 +163,6 @@ func parseTopic(topic string) (string, string, error) {
return parts[0], parts[1], nil
}
func allTopics() map[stream.Topic][]string {
return map[stream.Topic][]string{"*": {"*"}}
func allTopics() map[structs.Topic][]string {
return map[structs.Topic][]string{"*": {"*"}}
}

View File

@@ -3,6 +3,7 @@ package agent
import (
"context"
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
"net/http"
"net/http/httptest"
"net/url"
@@ -10,7 +11,6 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -38,7 +38,7 @@ func TestEventStream(t *testing.T) {
pub, err := s.Agent.server.State().EventPublisher()
require.NoError(t, err)
pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}})
pub.Publish(structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}})
testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
@@ -72,20 +72,20 @@ func TestEventStream_QueryParse(t *testing.T) {
cases := []struct {
desc string
query string
want map[stream.Topic][]string
want map[structs.Topic][]string
wantErr bool
}{
{
desc: "all topics and keys specified",
query: "?topic=*:*",
want: map[stream.Topic][]string{
want: map[structs.Topic][]string{
"*": {"*"},
},
},
{
desc: "all topics and keys inferred",
query: "",
want: map[stream.Topic][]string{
want: map[structs.Topic][]string{
"*": {"*"},
},
},
@@ -102,14 +102,14 @@ func TestEventStream_QueryParse(t *testing.T) {
{
desc: "single topic and key",
query: "?topic=NodeDrain:*",
want: map[stream.Topic][]string{
want: map[structs.Topic][]string{
"NodeDrain": {"*"},
},
},
{
desc: "single topic multiple keys",
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
want: map[structs.Topic][]string{
"NodeDrain": {
"*",
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
@@ -119,7 +119,7 @@ func TestEventStream_QueryParse(t *testing.T) {
{
desc: "multiple topics",
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
want: map[structs.Topic][]string{
"NodeDrain": {
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},