mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Use the b64 log format to track offsets for pause/play support
This commit is contained in:
33
ui/app/utils/classes/abstract-logger.js
Normal file
33
ui/app/utils/classes/abstract-logger.js
Normal file
@@ -0,0 +1,33 @@
|
||||
import Ember from 'ember';
|
||||
import queryString from 'npm:query-string';
|
||||
|
||||
const { Mixin, computed, assign } = Ember;
|
||||
const CHUNK_SIZE = 50000;
|
||||
|
||||
export default Mixin.create({
|
||||
url: '',
|
||||
params: computed(() => ({})),
|
||||
logFetch() {
|
||||
Ember.assert(
|
||||
'Loggers need a logFetch method, which should have an interface like window.fetch'
|
||||
);
|
||||
},
|
||||
|
||||
endOffset: null,
|
||||
|
||||
offsetParams: computed('endOffset', function() {
|
||||
const endOffset = this.get('endOffset');
|
||||
return endOffset
|
||||
? { origin: 'start', offset: endOffset }
|
||||
: { origin: 'end', offset: CHUNK_SIZE };
|
||||
}),
|
||||
|
||||
additionalParams: computed(() => ({})),
|
||||
|
||||
fullUrl: computed('url', 'params', 'offsetParams', 'additionalParams', function() {
|
||||
const queryParams = queryString.stringify(
|
||||
assign({}, this.get('params'), this.get('offsetParams'), this.get('additionalParams'))
|
||||
);
|
||||
return `${this.get('url')}?${queryParams}`;
|
||||
}),
|
||||
});
|
||||
@@ -1,38 +1,11 @@
|
||||
import Ember from 'ember';
|
||||
import queryString from 'npm:query-string';
|
||||
import { task, timeout } from 'ember-concurrency';
|
||||
import AbstractLogger from './abstract-logger';
|
||||
|
||||
const { Object: EmberObject, computed, assign } = Ember;
|
||||
const { Object: EmberObject } = Ember;
|
||||
|
||||
export default EmberObject.extend({
|
||||
url: '',
|
||||
export default EmberObject.extend(AbstractLogger, {
|
||||
interval: 1000,
|
||||
params: computed(() => ({})),
|
||||
logFetch() {
|
||||
Ember.assert(
|
||||
'Loggers need a logFetch method, which should have an interface like window.fetch'
|
||||
);
|
||||
},
|
||||
|
||||
endOffset: null,
|
||||
|
||||
fullUrl: computed('url', 'endOffset', 'params', function() {
|
||||
const endOffset = this.get('endOffset');
|
||||
let additionalParams;
|
||||
if (endOffset) {
|
||||
additionalParams = {
|
||||
origin: 'start',
|
||||
offset: this.get('endOffset'),
|
||||
};
|
||||
} else {
|
||||
additionalParams = {
|
||||
origin: 'end',
|
||||
offset: 50000,
|
||||
};
|
||||
}
|
||||
const queryParams = queryString.stringify(assign({}, this.get('params'), additionalParams));
|
||||
return `${this.get('url')}?${queryParams}`;
|
||||
}),
|
||||
|
||||
start() {
|
||||
return this.get('poll').perform();
|
||||
|
||||
@@ -1,21 +1,17 @@
|
||||
import Ember from 'ember';
|
||||
import queryString from 'npm:query-string';
|
||||
import { task } from 'ember-concurrency';
|
||||
import TextDecoder from 'nomad-ui/utils/classes/text-decoder';
|
||||
import AbstractLogger from './abstract-logger';
|
||||
|
||||
const { Object: EmberObject, computed, assign } = Ember;
|
||||
|
||||
export default EmberObject.extend({
|
||||
url: '',
|
||||
params: computed(() => ({})),
|
||||
logFetch() {
|
||||
Ember.assert(
|
||||
'Loggers need a logFetch method, which should have an interface like window.fetch'
|
||||
);
|
||||
},
|
||||
const { Object: EmberObject, computed } = Ember;
|
||||
|
||||
export default EmberObject.extend(AbstractLogger, {
|
||||
reader: null,
|
||||
|
||||
additionalParams: computed(() => ({
|
||||
follow: true,
|
||||
})),
|
||||
|
||||
start() {
|
||||
return this.get('poll').perform();
|
||||
},
|
||||
@@ -29,26 +25,47 @@ export default EmberObject.extend({
|
||||
},
|
||||
|
||||
poll: task(function*() {
|
||||
const queryParams = queryString.stringify(
|
||||
assign({}, this.get('params'), {
|
||||
plain: true,
|
||||
follow: true,
|
||||
origin: 'end',
|
||||
offset: 50000,
|
||||
})
|
||||
);
|
||||
const url = `${this.get('url')}?${queryParams}`;
|
||||
const url = this.get('fullUrl');
|
||||
const logFetch = this.get('logFetch');
|
||||
let streamClosed = false;
|
||||
|
||||
const reader = yield logFetch(url).then(res => res.body.getReader());
|
||||
this.set('reader', reader);
|
||||
let streamClosed = false;
|
||||
let buffer = '';
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
const reader = yield logFetch(url).then(res => res.body.getReader());
|
||||
|
||||
this.set('reader', reader);
|
||||
|
||||
while (!streamClosed) {
|
||||
yield reader.read().then(({ value, done }) => {
|
||||
streamClosed = done;
|
||||
this.get('write')(decoder.decode(value, { stream: true }));
|
||||
|
||||
// There is no guarantee that value will be a complete JSON object,
|
||||
// so it needs to be buffered.
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Only when the buffer contains a close bracket can we be sure the buffer
|
||||
// is in a complete state
|
||||
if (buffer.indexOf('}') !== -1) {
|
||||
// The buffer can be one or more complete frames with additional text for the
|
||||
// next frame
|
||||
const [, chunk, newBuffer] = buffer.match(/(.*\})(.*)$/);
|
||||
|
||||
// Peel chunk off the front of the buffer (since it represents complete frames)
|
||||
// and set the buffer to be the remainder
|
||||
buffer = newBuffer;
|
||||
|
||||
// Assuming the logs endpoint never returns nested JSON (it shouldn't), at this
|
||||
// point chunk is a series of valid JSON objects with no delimiter.
|
||||
const lines = chunk.replace(/\}\{/g, '}\n{').split('\n');
|
||||
const frames = lines.map(line => JSON.parse(line)).filter(frame => frame.Data);
|
||||
|
||||
if (frames.length) {
|
||||
frames.forEach(frame => (frame.Data = window.atob(frame.Data)));
|
||||
this.set('endOffset', frames[frames.length - 1].Offset);
|
||||
this.get('write')(frames.mapBy('Data').join(''));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}),
|
||||
|
||||
Reference in New Issue
Block a user