From bc3f02b2272375c10b19ffefef12175cff4f7a8f Mon Sep 17 00:00:00 2001 From: Michael Lange Date: Mon, 20 Nov 2017 17:09:28 -0800 Subject: [PATCH] Use the b64 log format to track offsets for pause/play support --- ui/app/utils/classes/abstract-logger.js | 33 +++++++++++++ ui/app/utils/classes/poll-logger.js | 33 ++----------- ui/app/utils/classes/stream-logger.js | 65 ++++++++++++++++--------- 3 files changed, 77 insertions(+), 54 deletions(-) create mode 100644 ui/app/utils/classes/abstract-logger.js diff --git a/ui/app/utils/classes/abstract-logger.js b/ui/app/utils/classes/abstract-logger.js new file mode 100644 index 000000000..869a09dc7 --- /dev/null +++ b/ui/app/utils/classes/abstract-logger.js @@ -0,0 +1,33 @@ +import Ember from 'ember'; +import queryString from 'npm:query-string'; + +const { Mixin, computed, assign } = Ember; +const CHUNK_SIZE = 50000; + +export default Mixin.create({ + url: '', + params: computed(() => ({})), + logFetch() { + Ember.assert( + 'Loggers need a logFetch method, which should have an interface like window.fetch' + ); + }, + + endOffset: null, + + offsetParams: computed('endOffset', function() { + const endOffset = this.get('endOffset'); + return endOffset + ? { origin: 'start', offset: endOffset } + : { origin: 'end', offset: CHUNK_SIZE }; + }), + + additionalParams: computed(() => ({})), + + fullUrl: computed('url', 'params', 'offsetParams', 'additionalParams', function() { + const queryParams = queryString.stringify( + assign({}, this.get('params'), this.get('offsetParams'), this.get('additionalParams')) + ); + return `${this.get('url')}?${queryParams}`; + }), +}); diff --git a/ui/app/utils/classes/poll-logger.js b/ui/app/utils/classes/poll-logger.js index 6b628fa67..3077e80a0 100644 --- a/ui/app/utils/classes/poll-logger.js +++ b/ui/app/utils/classes/poll-logger.js @@ -1,38 +1,11 @@ import Ember from 'ember'; -import queryString from 'npm:query-string'; import { task, timeout } from 'ember-concurrency'; +import AbstractLogger from './abstract-logger'; -const { Object: EmberObject, computed, assign } = Ember; +const { Object: EmberObject } = Ember; -export default EmberObject.extend({ - url: '', +export default EmberObject.extend(AbstractLogger, { interval: 1000, - params: computed(() => ({})), - logFetch() { - Ember.assert( - 'Loggers need a logFetch method, which should have an interface like window.fetch' - ); - }, - - endOffset: null, - - fullUrl: computed('url', 'endOffset', 'params', function() { - const endOffset = this.get('endOffset'); - let additionalParams; - if (endOffset) { - additionalParams = { - origin: 'start', - offset: this.get('endOffset'), - }; - } else { - additionalParams = { - origin: 'end', - offset: 50000, - }; - } - const queryParams = queryString.stringify(assign({}, this.get('params'), additionalParams)); - return `${this.get('url')}?${queryParams}`; - }), start() { return this.get('poll').perform(); diff --git a/ui/app/utils/classes/stream-logger.js b/ui/app/utils/classes/stream-logger.js index 1fdaaa3cb..e42c95518 100644 --- a/ui/app/utils/classes/stream-logger.js +++ b/ui/app/utils/classes/stream-logger.js @@ -1,21 +1,17 @@ import Ember from 'ember'; -import queryString from 'npm:query-string'; import { task } from 'ember-concurrency'; import TextDecoder from 'nomad-ui/utils/classes/text-decoder'; +import AbstractLogger from './abstract-logger'; -const { Object: EmberObject, computed, assign } = Ember; - -export default EmberObject.extend({ - url: '', - params: computed(() => ({})), - logFetch() { - Ember.assert( - 'Loggers need a logFetch method, which should have an interface like window.fetch' - ); - }, +const { Object: EmberObject, computed } = Ember; +export default EmberObject.extend(AbstractLogger, { reader: null, + additionalParams: computed(() => ({ + follow: true, + })), + start() { return this.get('poll').perform(); }, @@ -29,26 +25,47 @@ export default EmberObject.extend({ }, poll: task(function*() { - const queryParams = queryString.stringify( - assign({}, this.get('params'), { - plain: true, - follow: true, - origin: 'end', - offset: 50000, - }) - ); - const url = `${this.get('url')}?${queryParams}`; + const url = this.get('fullUrl'); const logFetch = this.get('logFetch'); - let streamClosed = false; - const reader = yield logFetch(url).then(res => res.body.getReader()); - this.set('reader', reader); + let streamClosed = false; + let buffer = ''; const decoder = new TextDecoder(); + const reader = yield logFetch(url).then(res => res.body.getReader()); + + this.set('reader', reader); + while (!streamClosed) { yield reader.read().then(({ value, done }) => { streamClosed = done; - this.get('write')(decoder.decode(value, { stream: true })); + + // There is no guarantee that value will be a complete JSON object, + // so it needs to be buffered. + buffer += decoder.decode(value, { stream: true }); + + // Only when the buffer contains a close bracket can we be sure the buffer + // is in a complete state + if (buffer.indexOf('}') !== -1) { + // The buffer can be one or more complete frames with additional text for the + // next frame + const [, chunk, newBuffer] = buffer.match(/(.*\})(.*)$/); + + // Peel chunk off the front of the buffer (since it represents complete frames) + // and set the buffer to be the remainder + buffer = newBuffer; + + // Assuming the logs endpoint never returns nested JSON (it shouldn't), at this + // point chunk is a series of valid JSON objects with no delimiter. + const lines = chunk.replace(/\}\{/g, '}\n{').split('\n'); + const frames = lines.map(line => JSON.parse(line)).filter(frame => frame.Data); + + if (frames.length) { + frames.forEach(frame => (frame.Data = window.atob(frame.Data))); + this.set('endOffset', frames[frames.length - 1].Offset); + this.get('write')(frames.mapBy('Data').join('')); + } + } }); } }),