Event Stream API/RPC (#8947)

This Commit adds an /v1/events/stream endpoint to stream events from.

The stream framer has been updated to include a SendFull method which
does not fragment the data between multiple frames. This essentially
treats the stream framer as a envelope to adhere to the stream framer
interface in the UI.

If the `encode` query parameter is omitted events will be streamed as
newline delimted JSON.
This commit is contained in:
Drew Bailey
2020-09-28 10:13:10 -04:00
parent 8bd15b534f
commit b825ba3bf1
18 changed files with 1129 additions and 31 deletions

View File

@@ -0,0 +1,169 @@
package agent
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
query := req.URL.Query()
indexStr := query.Get("index")
if indexStr == "" {
indexStr = "0"
}
index, err := strconv.Atoi(indexStr)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unable to parse index: %v", err))
}
topics, err := parseEventTopics(query)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Invalid topic query: %v", err))
}
args := &structs.EventStreamRequest{
Topics: topics,
Index: index,
}
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("Cache-Control", "no-cache")
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
var handler structs.StreamingRpcHandler
var handlerErr error
if server := s.agent.Server(); server != nil {
handler, handlerErr = server.StreamingRpcHandler("Event.Stream")
} else if client := s.agent.Client(); client != nil {
handler, handlerErr = client.RemoteStreamingRpcHandler("Event.Stream")
} else {
handlerErr = fmt.Errorf("misconfigured connection")
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
// Create a goroutine that closes the pipe if the connection closes
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
go func() {
<-ctx.Done()
httpPipe.Close()
}()
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
// create an error channel to handle errors
errCh := make(chan HTTPCodedError, 2)
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}
for {
select {
case <-ctx.Done():
errCh <- nil
return
default:
}
// Decode the response
var res structs.EventStreamWrapper
if err := decoder.Decode(&res); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
}
// Flush json entry to response
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()
// invoke handler
handler(handlerPipe)
cancel()
codedErr := <-errCh
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) {
codedErr = nil
}
return nil, codedErr
}
func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) {
raw, ok := query["topic"]
if !ok {
return allTopics(), nil
}
topics := make(map[stream.Topic][]string)
for _, topic := range raw {
k, v, err := parseTopic(topic)
if err != nil {
return nil, fmt.Errorf("error parsing topics: %w", err)
}
if topics[stream.Topic(k)] == nil {
topics[stream.Topic(k)] = []string{v}
} else {
topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v)
}
}
return topics, nil
}
func parseTopic(topic string) (string, string, error) {
parts := strings.Split(topic, ":")
if len(parts) != 2 {
return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic)
}
return parts[0], parts[1], nil
}
func allTopics() map[stream.Topic][]string {
return map[stream.Topic][]string{"*": {"*"}}
}

View File

@@ -0,0 +1,148 @@
package agent
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testEvent struct {
ID string
}
func TestEventStream(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil)
require.Nil(t, err)
resp := httptest.NewRecorder()
respErrCh := make(chan error)
go func() {
_, err = s.Server.EventStream(resp, req)
respErrCh <- err
assert.NoError(t, err)
}()
pub, err := s.Agent.server.State().EventPublisher()
require.NoError(t, err)
pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}})
testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
want := `{"ID":"123"}`
if strings.Contains(got, want) {
return true, nil
}
return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
}, func(err error) {
cancel()
require.Fail(t, err.Error())
})
// wait for response to close to prevent race between subscription
// shutdown and server shutdown returning subscription closed by server err
// resp.Close()
cancel()
select {
case err := <-respErrCh:
require.Nil(t, err)
case <-time.After(1 * time.Second):
require.Fail(t, "waiting for request cancellation")
}
})
}
func TestEventStream_QueryParse(t *testing.T) {
t.Parallel()
cases := []struct {
desc string
query string
want map[stream.Topic][]string
wantErr bool
}{
{
desc: "all topics and keys specified",
query: "?topic=*:*",
want: map[stream.Topic][]string{
"*": []string{"*"},
},
},
{
desc: "all topics and keys inferred",
query: "",
want: map[stream.Topic][]string{
"*": []string{"*"},
},
},
{
desc: "invalid key value formatting",
query: "?topic=NodeDrain:*:*",
wantErr: true,
},
{
desc: "invalid key value formatting no value",
query: "?topic=NodeDrain",
wantErr: true,
},
{
desc: "single topic and key",
query: "?topic=NodeDrain:*",
want: map[stream.Topic][]string{
"NodeDrain": []string{"*"},
},
},
{
desc: "single topic multiple keys",
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"*",
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
},
},
{
desc: "multiple topics",
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
"NodeRegister": []string{
"*",
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query)
req, err := url.Parse(raw)
require.NoError(t, err)
got, err := parseEventTopics(req.Query())
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}

View File

@@ -326,6 +326,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))
s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))
if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {

View File

@@ -78,6 +78,10 @@ type Config struct {
// in the absence of ACLs
EnableDebug bool
// EnableEventPublisher is used to enable or disable the state stores
// event publishing
EnableEventPublisher bool
// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer
@@ -413,6 +417,7 @@ func DefaultConfig() *Config {
ReplicationBackoff: 30 * time.Second,
SentinelGCInterval: 30 * time.Second,
LicenseConfig: &LicenseConfig{},
EnableEventPublisher: true,
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,

213
nomad/event_endpoint.go Normal file
View File

@@ -0,0 +1,213 @@
package nomad
import (
"context"
"io"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
type Event struct {
srv *Server
}
func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}
func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()
var args structs.EventStreamRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// forward to appropriate region
if args.Region != e.srv.config.Region {
err := e.forwardStreamingRPC(args.Region, "Event.Stream", args, conn)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
}
return
}
// ACL check
// TODO(drew) ACL checks need to be per topic
// All Events Management
// System Events Management
// Node Events NamespaceCapabilityReadEvents
// Job/Alloc Events NamespaceCapabilityReadEvents
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
handleJsonResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.IsManagement() {
handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
// TODO(drew) handle streams without ACLS
reqToken := args.AuthToken
if reqToken == "" {
// generate a random request token
reqToken = uuid.Generate()
}
subReq := &stream.SubscribeRequest{
Token: reqToken,
Topics: args.Topics,
Index: uint64(args.Index),
}
publisher, err := e.srv.State().EventPublisher()
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start subscription to publisher
subscription, err := publisher.Subscribe(subReq)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
defer subscription.Unsubscribe()
ndJsonCh := make(chan *stream.NDJson)
errCh := make(chan error)
jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second)
jsonStream.Run(ctx)
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
select {
case <-errCh:
case <-ctx.Done():
return
}
}()
go func() {
defer cancel()
LOOP:
for {
events, err := subscription.Next(ctx)
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
// Continue if there are no events
if events == nil {
continue
}
// Send each event as its own frame
for _, e := range events {
if err := jsonStream.Send(e); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
}
}
}()
var streamErr error
OUTER:
for {
select {
case streamErr = <-errCh:
break OUTER
case <-ctx.Done():
break OUTER
case eventJSON, ok := <-ndJsonCh:
// check if ndjson may have been closed when an error occurred,
// check once more for an error.
if !ok {
select {
case streamErr = <-errCh:
// There was a pending error
default:
}
break OUTER
}
var resp structs.EventStreamWrapper
resp.Event = eventJSON
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}
if streamErr != nil {
handleJsonResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
func (e *Event) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error {
server, err := e.srv.findRegionServer(region)
if err != nil {
return err
}
return e.forwardStreamingRPCToServer(server, method, args, in)
}
func (e *Event) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error {
srvConn, err := e.srv.streamingRpc(server, method)
if err != nil {
return err
}
defer srvConn.Close()
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
return err
}
structs.Bridge(in, srvConn)
return nil
}
// handleJsonResultError is a helper for sending an error with a potential
// error code. The transmission of the error is ignored if the error has been
// generated by the closing of the underlying transport.
func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
// Nothing to do as the conn is closed
if err == io.EOF {
return
}
encoder.Encode(&structs.EventStreamWrapper{
Error: structs.NewRpcError(err, code),
})
}

View File

@@ -0,0 +1,295 @@
package nomad
import (
"encoding/json"
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/require"
)
func TestEventStream(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventPublisher = true
})
defer cleanupS1()
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
// invoke handler
go handler(p2)
// send request
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// retrieve publisher for server, send event
publisher, err := s1.State().EventPublisher()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}})
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// ignore heartbeat
if msg.Event == stream.NDJsonHeartbeat {
continue
}
var event stream.Event
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
// decode fully to ensure we received expected out
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
}
}
}
// TestEventStream_StreamErr asserts an error is returned when an event publisher
// closes its subscriptions
func TestEventStream_StreamErr(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventPublisher = true
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventPublisher()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// stop the publisher to force an error on subscription side
s1.State().StopEventPublisher()
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error == nil {
// race between error and receiving an event
// continue trying for error
continue
}
require.NotNil(t, msg.Error)
require.Contains(t, msg.Error.Error(), "subscription closed by server")
break OUTER
}
}
}
// TestEventStream_RegionForward tests event streaming from one server
// to another in a different region
func TestEventStream_RegionForward(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventPublisher = true
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.EnableEventPublisher = true
c.Region = "foo"
})
defer cleanupS2()
TestJoin(t, s1, s2)
// Create request targed for region foo
req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
QueryOptions: structs.QueryOptions{
Region: "foo",
},
}
// Query s1 handler
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// publish with server 2
publisher, err := s2.State().EventPublisher()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
if msg.Event == stream.NDJsonHeartbeat {
continue
}
var event stream.Event
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
}
}
}
// TODO(drew) acl test
func TestEventStream_ACL(t *testing.T) {
}

View File

@@ -126,14 +126,17 @@ type FSMConfig struct {
// Region is the region of the server embedding the FSM
Region string
EnableEventPublisher bool
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(config *FSMConfig) (*nomadFSM, error) {
// Create a state store
sconfig := &state.StateStoreConfig{
Logger: config.Logger,
Region: config.Region,
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventPublisher,
}
state, err := state.NewStateStore(sconfig)
if err != nil {
@@ -163,6 +166,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) {
// Close is used to cleanup resources associated with the FSM
func (n *nomadFSM) Close() error {
n.state.StopEventPublisher()
return nil
}

View File

@@ -275,6 +275,7 @@ type endpoints struct {
ACL *ACL
Scaling *Scaling
Enterprise *EnterpriseEndpoints
Event *Event
// Client endpoints
ClientStats *ClientStats
@@ -1162,6 +1163,9 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
s.staticEndpoints.Agent = &Agent{srv: s}
s.staticEndpoints.Agent.register()
s.staticEndpoints.Event = &Event{srv: s}
s.staticEndpoints.Event.register()
}
// Register the static handlers
@@ -1207,11 +1211,12 @@ func (s *Server) setupRaft() error {
// Create the FSM
fsmConfig := &FSMConfig{
EvalBroker: s.evalBroker,
Periodic: s.periodicDispatcher,
Blocked: s.blockedEvals,
Logger: s.logger,
Region: s.Region(),
EvalBroker: s.evalBroker,
Periodic: s.periodicDispatcher,
Blocked: s.blockedEvals,
Logger: s.logger,
Region: s.Region(),
EnableEventPublisher: s.config.EnableEventPublisher,
}
var err error
s.fsm, err = NewFSM(fsmConfig)

View File

@@ -32,14 +32,14 @@ type Changes struct {
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
// sent to the EventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
publisher *stream.EventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
}
func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB {
func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor) *changeTrackerDB {
return &changeTrackerDB{
db: db,
publisher: publisher,
@@ -49,15 +49,7 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn cha
type changeProcessor func(ReadTxn, Changes) ([]stream.Event, error)
type eventPublisher interface {
Publish(index uint64, events []stream.Event)
}
// noOpPublisher satisfies the eventPublisher interface and does nothing
type noOpPublisher struct{}
func (n *noOpPublisher) Publish(index uint64, events []stream.Event) {}
func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil }
func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil }
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn

View File

@@ -93,10 +93,11 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,
EventBufferSize: 250,
Logger: config.Logger,
})
s.db = NewChangeTrackerDB(db, publisher, processDBChanges)
} else {
s.db = NewChangeTrackerDB(db, &noOpPublisher{}, processDBChanges)
s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges)
}
// Initialize the state store with required enterprise objects
@@ -107,6 +108,13 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
return s, nil
}
func (s *StateStore) EventPublisher() (*stream.EventPublisher, error) {
if s.db.publisher == nil {
return nil, fmt.Errorf("EventPublisher not configured")
}
return s.db.publisher, nil
}
// Config returns the state store configuration.
func (s *StateStore) Config() *StateStoreConfig {
return s.config
@@ -123,7 +131,8 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
config: s.config,
}
store.db = NewChangeTrackerDB(memDBSnap, &noOpPublisher{}, noOpProcessChanges)
// Create a new change tracker DB that does not publish or track changes
store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges)
snap := &StateSnapshot{
StateStore: store,

View File

@@ -256,7 +256,7 @@ func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*buf
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, fmt.Errorf("waiting for next event: %w", ctx.Err())
case <-forceClose:
return nil, fmt.Errorf("subscription closed")
case <-i.link.ch:

View File

@@ -15,6 +15,7 @@ const (
type EventPublisherCfg struct {
EventBufferSize int64
EventBufferTTL time.Duration
Logger hclog.Logger
}
type EventPublisher struct {
@@ -56,8 +57,13 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish
cfg.EventBufferTTL = 1 * time.Hour
}
if cfg.Logger == nil {
cfg.Logger = hclog.NewNullLogger()
}
buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL)
e := &EventPublisher{
logger: cfg.Logger.Named("event_publisher"),
eventBuf: buffer,
publishCh: make(chan changeEvents),
subscriptions: &subscriptions{
@@ -95,7 +101,12 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index))
}
sub := newSubscription(req, head, func() {})
// Empty head so that calling Next on sub
start := newBufferItem(req.Index, []Event{})
start.link.next.Store(head)
close(start.link.ch)
sub := newSubscription(req, start, e.subscriptions.unsubscribe(req))
e.subscriptions.add(req, sub)
return sub, nil

114
nomad/stream/ndjson.go Normal file
View File

@@ -0,0 +1,114 @@
package stream
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"time"
)
var (
// NDJsonHeartbeat is the NDJson to send as a heartbeat
// Avoids creating many heartbeat instances
NDJsonHeartbeat = &NDJson{Data: []byte("{}\n")}
)
// NDJsonStream is used to send new line delimited JSON and heartbeats
// to a destination (out channel)
type NDJsonStream struct {
out chan<- *NDJson
// heartbeat is the interval to send heartbeat messages to keep a connection
// open.
heartbeat *time.Ticker
publishCh chan NDJson
exitCh chan struct{}
l sync.Mutex
running bool
}
// NNDJson is a wrapper for a Newline Delimited JSON object
type NDJson struct {
Data []byte
}
// NewNNewNDJsonStream creates a new NDJson stream that will output NDJson structs
// to the passed output channel
func NewNDJsonStream(out chan<- *NDJson, heartbeat time.Duration) *NDJsonStream {
return &NDJsonStream{
out: out,
heartbeat: time.NewTicker(heartbeat),
exitCh: make(chan struct{}),
publishCh: make(chan NDJson),
}
}
// Run starts a long lived goroutine that handles sending
// heartbeats and processed json objects to the streams out channel as well
func (n *NDJsonStream) Run(ctx context.Context) {
n.l.Lock()
if n.running {
return
}
n.running = true
n.l.Unlock()
go n.run(ctx)
}
func (n *NDJsonStream) run(ctx context.Context) {
defer func() {
n.l.Lock()
n.running = false
n.l.Unlock()
close(n.exitCh)
}()
for {
select {
case <-ctx.Done():
return
case msg := <-n.publishCh:
n.out <- msg.Copy()
case <-n.heartbeat.C:
// Send a heartbeat frame
select {
case n.out <- NDJsonHeartbeat:
case <-ctx.Done():
return
}
}
}
}
// Send encodes an object into Newline delimited json. An error is returned
// if json encoding fails or if the stream is no longer running.
func (n *NDJsonStream) Send(obj interface{}) error {
n.l.Lock()
defer n.l.Unlock()
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(obj); err != nil {
return fmt.Errorf("marshaling json for stream: %w", err)
}
select {
case n.publishCh <- NDJson{Data: buf.Bytes()}:
case <-n.exitCh:
return fmt.Errorf("stream is no longer running")
}
return nil
}
func (j *NDJson) Copy() *NDJson {
n := new(NDJson)
*n = *j
n.Data = make([]byte, len(j.Data))
copy(n.Data, j.Data)
return n
}

View File

@@ -0,0 +1,72 @@
package stream
import (
"bytes"
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type testObj struct {
Name string `json:"name"`
}
func TestNDJson(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := make(chan *NDJson)
s := NewNDJsonStream(out, 1*time.Second)
s.Run(ctx)
require.NoError(t, s.Send(testObj{Name: "test"}))
out1 := <-out
var expected bytes.Buffer
expected.Write([]byte(`{"name":"test"}`))
expected.Write([]byte("\n"))
require.Equal(t, expected.Bytes(), out1.Data)
select {
case _ = <-out:
t.Fatalf("Did not expect another message")
case <-time.After(100 * time.Millisecond):
}
}
func TestNDJson_Send_After_Stop(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := make(chan *NDJson)
s := NewNDJsonStream(out, 1*time.Second)
s.Run(ctx)
// stop the stream
cancel()
time.Sleep(10 * time.Millisecond)
require.Error(t, s.Send(testObj{}))
}
func TestNDJson_HeartBeat(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := make(chan *NDJson)
s := NewNDJsonStream(out, 10*time.Millisecond)
s.Run(ctx)
heartbeat := <-out
require.Equal(t, NDJsonHeartbeat, heartbeat)
}

View File

@@ -21,10 +21,6 @@ const (
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
// type Subscriber struct {
// logger hclog.Logger
// }
type Subscription struct {
// state is accessed atomically 0 means open, 1 means closed with reload
state uint32
@@ -104,8 +100,15 @@ func filter(req *SubscribeRequest, events []Event) []Event {
var count int
for _, e := range events {
if _, ok := req.Topics[e.Topic]; ok {
for _, k := range req.Topics[e.Topic] {
_, allTopics := req.Topics[AllKeys]
if _, ok := req.Topics[e.Topic]; ok || allTopics {
var keys []string
if allTopics {
keys = req.Topics[AllKeys]
} else {
keys = req.Topics[e.Topic]
}
for _, k := range keys {
if e.Key == k || k == AllKeys {
count++
}
@@ -124,8 +127,15 @@ func filter(req *SubscribeRequest, events []Event) []Event {
// Return filtered events
result := make([]Event, 0, count)
for _, e := range events {
if _, ok := req.Topics[e.Topic]; ok {
for _, k := range req.Topics[e.Topic] {
_, allTopics := req.Topics[AllKeys]
if _, ok := req.Topics[e.Topic]; ok || allTopics {
var keys []string
if allTopics {
keys = req.Topics[AllKeys]
} else {
keys = req.Topics[e.Topic]
}
for _, k := range keys {
if e.Key == k || k == AllKeys {
result = append(result, e)
}

View File

@@ -10,6 +10,22 @@ func TestSubscription(t *testing.T) {
}
func TestFilter_AllTopics(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"})
req := &SubscribeRequest{
Topics: map[Topic][]string{
"*": []string{"*"},
},
}
actual := filter(req, events)
require.Equal(t, events, actual)
// ensure new array was not allocated
require.Equal(t, cap(actual), 5)
}
func TestFilter_AllKeys(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"})

View File

@@ -39,6 +39,7 @@ import (
"github.com/hashicorp/nomad/helper/constraints/semver"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/kheap"
"github.com/hashicorp/nomad/nomad/stream"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)
@@ -10694,3 +10695,34 @@ type ACLTokenUpsertResponse struct {
Tokens []*ACLToken
WriteMeta
}
// EEventStreamRequest is used to stream events from a servers
// EventPublisher
type EventStreamRequest struct {
Topics map[stream.Topic][]string
Index int
QueryOptions
}
type EventStreamWrapper struct {
Error *RpcError
Event *stream.NDJson
}
// RpcError is used for serializing errors with a potential error code
type RpcError struct {
Message string
Code *int64
}
func NewRpcError(err error, code *int64) *RpcError {
return &RpcError{
Message: err.Error(),
Code: code,
}
}
func (r *RpcError) Error() string {
return r.Message
}

View File

@@ -47,6 +47,7 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
config.Logger = testlog.HCLogger(t)
config.Build = version.Version + "+unittest"
config.DevMode = true
config.EnableEventPublisher = true
config.BootstrapExpect = 1
nodeNum := atomic.AddUint32(&nodeNumber, 1)
config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum)