diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index ee42cc6..40842de 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -49,7 +49,12 @@ const db = _createDB( db_conf ); // Prometheus Metrics const prom_factory = new PrometheusFactory(); -const metrics = new MetricsCollector( prom_factory, prom_conf, emitter ); +const metrics = new MetricsCollector( + prom_factory, + prom_conf, + emitter, + process.hrtime, +); // Structured logging new EventLogger( console, env, emitter, ts_ctr ); @@ -60,37 +65,30 @@ let publisher: DeltaPublisher; let processor: DeltaProcessor; _getMongoCollection( db, db_conf ) -.then( ( conn: MongoCollection ) => -{ - return new MongoDeltaDao( conn ); -} ) -.then( ( mongoDao: MongoDeltaDao ) => -{ - dao = mongoDao; - publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); - processor = new DeltaProcessor( mongoDao, publisher, emitter ); -} ) -.then( _ => -{ - publisher.connect(); -} ) -.then( _ => -{ - const pidPath = __dirname + '/../conf/.delta_processor.pid'; + .then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } ) + .then( ( mongoDao: MongoDeltaDao ) => + { + dao = mongoDao; + publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); + processor = new DeltaProcessor( mongoDao, publisher, emitter ); + } ) + .then( _ => publisher.connect() ) + .then( _ => + { + const pidPath = __dirname + '/../conf/.delta_processor.pid'; - writePidFile(pidPath ); - greet( 'Liza Delta Processor', pidPath ); + writePidFile(pidPath ); + greet( 'Liza Delta Processor', pidPath ); - process_interval = setInterval( - () => - { - processor.process(); - metrics.checkForErrors( dao ); - }, - process_interval_ms, - ); -} ) -.catch( e => { console.error( 'Error: ' + e ); } ); + process_interval = setInterval( () => + { + processor.process(); + metrics.checkForErrors( dao ); + }, + process_interval_ms, + ); + } ) + .catch( e => { console.error( 'Error: ' + e ); } ); /** @@ -117,18 +115,9 @@ function writePidFile( pid_path: string ): void { fs.writeFileSync( pid_path, process.pid ); - process.on( 'SIGINT', function() - { - shutdown( 'SIGINT' ); - } ) - .on( 'SIGTERM', function() - { - shutdown( 'SIGTERM' ); - } ) - .on( 'exit', () => - { - fs.unlink( pid_path, () => {} ); - } ); + process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } ) + .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } ) + .on( 'exit', () => { fs.unlink( pid_path, () => {} ); } ); } @@ -327,7 +316,7 @@ function _getAmqpConfig( env: any ): AmqpConfig 'vhost': env.amqp_vhost, 'exchange': env.amqp_exchange, 'retries': env.amqp_retries || 30, - 'retry_wait': env.amqp_retry_wait || 1, + 'retry_wait': env.amqp_retry_wait || 1000, }; } diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index 58c75ec..ef2ab77 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -25,6 +25,8 @@ export type Kv = Record; /** Possible delta values for Kv array indexes */ export type DeltaDatum = T | null | undefined; +/** Possible delta types */ +export type DeltaType = 'ratedata' | 'data'; /** * The constructor type for a delta generating function @@ -44,7 +46,7 @@ export type DeltaConstructor = Kv, V extends Kv = export type DeltaResult = { [K in keyof T]: DeltaDatum | null }; - /** +/** * Create delta to transform from src into dest * * @param src - the source data set @@ -97,13 +99,115 @@ export function createDelta, V extends Kv>( } +/** + * Apply a delta to a bucket + * + * @param bucket - The bucket data + * @param delta - The delta to apply + * + * @return the delta + */ +export function applyDelta, V extends Kv>( + bucket: U = {}, + delta: DeltaResult, +): U +{ + const appliedDelta: DeltaResult = {}; + + if( !delta ) + { + return bucket; + } + + // Loop through all keys + const key_set = new Set( + Object.keys( bucket ).concat( Object.keys( delta ) ) ); + + key_set.forEach( key => + { + const bucket_data = bucket[ key ]; + const delta_data = delta[ key ]; + + // If bucket does not contain the key, use entire delta data + if ( !bucket_data || !bucket_data.length ) + { + appliedDelta[ key ] = delta_data; + + return; + } + + // If delta does not contain the key then retain bucket data + if ( delta_data === null ) + { + return; + } + + // If delta does not contain the key then retain bucket data + if ( delta_data === undefined ) + { + appliedDelta[ key ] = bucket_data; + + return; + } + + // If neither condition above is true then create the key iteratively + appliedDelta[ key ] = _applyDeltaKey( bucket_data, delta_data ); + } ); + + return appliedDelta; +} + + +/** + * Apply the delta key iteratively + * + * @param bucket - The bucket data array + * @param delta - The delta data array + * + * @return an object with an changed flag and a data array + */ +function _applyDeltaKey( + bucket: T[], + delta: T[], +): DeltaDatum[] +{ + const data = []; + const max_size = Math.max( delta.length, bucket.length ); + + for ( let i = 0; i < max_size; i++ ) + { + const delta_datum = delta[ i ]; + const bucket_datum = bucket[ i ]; + + if ( delta_datum === null ) + { + break; + } + else if ( delta_datum === undefined ) + { + data[ i ] = bucket_datum; + } + else if ( _deepEqual( delta_datum, bucket_datum ) ) + { + data[ i ] = bucket_datum; + } + else + { + data[ i ] = delta_datum; + } + } + + return data; +} + + /** * Build the delta key iteratively * * @param src - the source data array * @param dest - the destination data array * - * @return an object with an identical flag and a data array + * @return an object with an changed flag and a data array */ function _createDeltaKey( src: T[], diff --git a/src/system/db/MongoError.ts b/src/error/AmqpError.ts similarity index 91% rename from src/system/db/MongoError.ts rename to src/error/AmqpError.ts index 3458c97..8bf308e 100644 --- a/src/system/db/MongoError.ts +++ b/src/error/AmqpError.ts @@ -1,5 +1,5 @@ /** - * Mongodb error + * Amqp error * * Copyright (C) 2010-2019 R-T Specialty, LLC. * @@ -24,4 +24,4 @@ const { Class } = require( 'easejs' ); -export const MongoError = Class( 'MongoError' ).extend( Error, {} ); +export const AmqpError = Class( 'AmqpError' ).extend( Error, {} ); diff --git a/src/error/DaoError.ts b/src/error/DaoError.ts new file mode 100644 index 0000000..2939b69 --- /dev/null +++ b/src/error/DaoError.ts @@ -0,0 +1,27 @@ +/** + * Dao error + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * This still uses ease.js because it does a good job of transparently + * creating Error subtypes. + */ + +const { Class } = require( 'easejs' ); + +export const DaoError = Class( 'DaoError' ).extend( Error, {} ); diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index 26b4949..12f521c 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -22,6 +22,7 @@ */ import { DeltaResult } from "../bucket/delta"; +import { DocumentId } from '../document/Document'; import { Options } from 'amqplib'; @@ -69,7 +70,13 @@ export interface AmqpPublisher /** * Publish quote message to exchange post-rating * - * @param delta - The delta to publish + * @param delta - The delta + * @param bucket - The bucket + * @param doc_id - The doc_id */ - publish( delta: DeltaResult ): Promise; + publish( + delta: DeltaResult, + bucket: Record, + doc_id: DocumentId, + ): Promise } diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 088a2e6..d8da5cb 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -20,116 +20,123 @@ */ import { DeltaDao } from "../system/db/DeltaDao"; -import { MongoDeltaType } from "../system/db/MongoDeltaDao"; -import { DeltaResult } from "../bucket/delta"; +import { DeltaResult, DeltaType, applyDelta } from "../bucket/delta"; import { DocumentId } from "../document/Document"; import { AmqpPublisher } from "./AmqpPublisher"; import { EventEmitter } from "events"; + /** * Process deltas for a quote and publish to a queue */ export class DeltaProcessor { /** The ratedata delta type */ - readonly DELTA_RATEDATA: MongoDeltaType = 'ratedata'; + readonly DELTA_RATEDATA: DeltaType = 'ratedata'; /** The data delta type */ - readonly DELTA_DATA: MongoDeltaType = 'data'; + readonly DELTA_DATA: DeltaType = 'data'; /** * Initialize processor * - * @param _dao - Mongo collection + * @param _dao - Delta dao * @param _publisher - Amqp Publisher * @param _emitter - Event emiter instance */ constructor( private readonly _dao: DeltaDao, private readonly _publisher: AmqpPublisher, - private readonly _emitter: EventEmitter + private readonly _emitter: EventEmitter, ) {} /** * Process unpublished deltas */ - process(): void + process(): Promise { - this._dao.getUnprocessedDocuments() - .then( docs => - { - docs.forEach( doc => - { - const deltas = this.getTimestampSortedDeltas( doc ); - const doc_id: DocumentId = doc.id; - const last_updated_ts = doc.lastUpdate; - - for ( let i = 0; i < deltas.length; i++ ) - { - const delta = deltas[ i ]; - const startTime = process.hrtime(); - let error = null; - - this._publisher.publish( delta ) - .then( _ => - { - this._dao.advanceDeltaIndex( doc_id, delta.type ); - } ) - .catch( err => - { - this._dao.setErrorFlag( doc_id ); - - error = err; - } ); - - // Do not process any more deltas for - // this document if there was an error - if ( error ) - { - this._emitter.emit( - 'delta-process-error', - error, - doc_id + delta.timestamp + delta.type - ); - - return; - } - else - { - const elapsedTime = process.hrtime( startTime ); - - this._emitter.emit( - 'delta-process-complete', - elapsedTime[ 1 ] / 10000 - ); - } - }; - - this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) - .then( _ => - { - this._emitter.emit( - 'document-processed', - 'Deltas on document ' + doc_id + ' processed ' - + 'successfully. Document has been marked as ' - + 'completely processed.' - ); - } ) - .catch( err => - { - this._emitter.emit( 'mongodb-err', err ); - } ); - } ); - } ) - .catch( err => - { - this._emitter.emit( 'mongodb-err', err ); - } ); + return this._dao.getUnprocessedDocuments() + .then( docs => this._processNext( docs ) ) + .catch( err => { this._emitter.emit( 'dao-err', err ) } ); } + private _processNext( docs: any ): Promise + { + if ( docs.length === 0 ) + { + return Promise.resolve(); + } + + const doc = docs.shift(); + + return this._processDocument( doc ) + .then( _ => this._processNext( docs ) ); + } + + + private _processDocument( doc: Record ): Promise + { + const deltas = this.getTimestampSortedDeltas( doc ); + const doc_id: DocumentId = doc.id; + const bucket = doc.data; + const last_updated_ts = doc.lastUpdate; + + return this._processNextDelta( deltas, bucket, doc_id ) + .then( _ => + this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) + ) + .then( _ => + { + this._emitter.emit( + 'document-processed', + 'Deltas on document ' + doc_id + ' processed ' + + 'successfully. Document has been marked as ' + + 'completely processed.' + ); + } ) + .catch( e => + { + this._emitter.emit( 'delta-err', e ); + this._dao.setErrorFlag( doc_id ); + } ); + } + + + private _processNextDelta( + deltas: DeltaResult[], + bucket: Record, + doc_id: DocumentId, + ): Promise + { + if ( deltas.length === 0 ) + { + return Promise.resolve(); + } + + const delta = deltas.shift(); + + if ( !delta ) + { + return Promise.reject( new Error( 'Undefined delta' ) ); + } + + const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type; + + this._emitter.emit( 'delta-process-start', delta_uid ); + + const new_bucket = applyDelta( bucket, delta.data ); + + return this._publisher.publish( delta, new_bucket, doc_id ) + .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) ) + .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) + .then( _ => this._processNextDelta( deltas, new_bucket, doc_id ) ); + } + + + /** * Get sorted list of deltas * @@ -157,7 +164,7 @@ export class DeltaProcessor * * @return a trimmed list of deltas */ - getDeltas( doc: any, type: MongoDeltaType ): DeltaResult[] + getDeltas( doc: any, type: DeltaType ): DeltaResult[] { const deltas_obj = doc.rdelta || {}; const deltas: DeltaResult[] = deltas_obj[ type ] || []; diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index d31cce2..a5b68b8 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -24,6 +24,9 @@ import { AmqpPublisher, AmqpConfig } from './AmqpPublisher'; import { DeltaResult } from '../bucket/delta'; import { EventEmitter } from "events"; +import { DocumentId } from '../document/Document'; +import { context } from '../error/ContextError'; +import { AmqpError } from '../error/AmqpError'; import { connect as amqpConnect, Channel, @@ -79,63 +82,18 @@ export class DeltaPublisher implements AmqpPublisher /** * Initialize connection */ - connect(): Promise + connect(): Promise { - return new Promise( ( resolve, reject ) => - { - amqpConnect( this._conf ) + return amqpConnect( this._conf ) .then( conn => { this._conn = conn; - // If there is an error, attemp to reconnect + // If there is an error, attempt to reconnect this._conn.on( 'error', e => { this._emitter.emit( 'amqp-conn-error', e ); - - let reconnect_interval: NodeJS.Timer; - - let retry_count = 0; - - const reconnect = () => - { - if ( ++retry_count >= this._conf.retries ) - { - clearInterval( reconnect_interval ); - - this._emitter.emit( - 'amqp-reconnect-fail', - 'Could not re-establish AMQP connection.' - ); - - return; - } - - this._emitter.emit( - 'amqp-reconnect', - '...attempting to re-establish AMQP connection' - ); - - this.connect() - .then( _ => - { - clearInterval( reconnect_interval ); - - this._emitter.emit( - 'amqp-reconnect', - 'AMQP re-connected' - ); - } ) - .catch( e => - { - this._emitter.emit( 'amqp-conn-error', e ); - } ); - } - - reconnect_interval = setInterval( - reconnect, - ( this._conf.retry_wait * 1000 ) - ); + this._reconnect(); } ); return this._conn.createChannel(); @@ -149,16 +107,45 @@ export class DeltaPublisher implements AmqpPublisher 'fanout', { durable: true } ); - - resolve(); - return; - } ) - .catch( e => - { - reject( e ); - return; } ); - } ); + } + + + /** + * Attempt to re-establish the connection + * + * @return Whether reconnecting was successful + */ + private _reconnect( retry_count: number = 0 ): void + { + if ( retry_count >= this._conf.retries ) + { + this._emitter.emit( + 'amqp-reconnect-fail', + 'Could not re-establish AMQP connection.' + ); + + return; + } + + this._emitter.emit( + 'amqp-reconnect', + '...attempting to re-establish AMQP connection' + ); + + this.connect() + .then( _ => + { + this._emitter.emit( + 'amqp-reconnect', + 'AMQP re-connected' + ); + } ) + .catch( _ => + { + const wait_ms = this._conf.retry_wait; + setTimeout( () => this._reconnect( ++retry_count ), wait_ms ); + } ); } @@ -177,15 +164,17 @@ export class DeltaPublisher implements AmqpPublisher /** * Publish quote message to exchange post-rating * - * @param delta - The delta to publish - * - * @return whether the message was published successfully + * @param delta - The delta + * @param bucket - The bucket + * @param doc_id - The doc_id */ - publish( delta: DeltaResult ): Promise + publish( + delta: DeltaResult, + bucket: Record, + doc_id: DocumentId, + ): Promise { - return new Promise( ( resolve, reject ) => - { - this.sendMessage( delta ) + return this.sendMessage( delta, bucket, doc_id ) .then( _ => { this._emitter.emit( @@ -194,85 +183,66 @@ export class DeltaPublisher implements AmqpPublisher + delta.timestamp + "' to '" + this._conf.exchange + '" exchange', ); - - resolve(); - return; - } ) - .catch( e => - { - this._emitter.emit( - 'publish-err', - "Error publishing " + delta.type + " delta with ts '" - + delta.timestamp + '" to "' + this._conf.exchange - + "' exchange: '" + e, - ) - - reject(); } ); - } ); } /** * Send message to exchange * - * @param delta - The delta to publish + * @param delta - The delta to publish + * @param bucket - The bucket + * @param doc_id - The doc_id * * @return whether publish was successful */ - sendMessage( delta: DeltaResult ): Promise + sendMessage( + delta: DeltaResult, + bucket: Record, + doc_id: DocumentId, + ): Promise { - return new Promise( ( resolve, reject ) => + return new Promise( ( resolve, reject ) => { const ts = this._ts_ctr(); const headers = { version: 1, created: ts }; - const delta_data = this.avroFormat( delta.data ); - const event_id = this.DELTA_MAP[ delta.type ]; - const avro_buffer = this.avroEncode( { - event: { - id: event_id, - ts: ts, - actor: 'SERVER', - step: null, - }, - document: { - id: 123123, // Fix - }, - session: { - entity_name: 'Foobar', // Fix - entity_id: 123123, // Fix - }, - data: { - Data: { - bucket: delta_data, - }, - }, - delta: { - Data: { - bucket: delta_data, - }, - }, - program: { - Program: { - id: 'quote_server', - version: 'dadaddwafdwa', // Fix - }, - }, - } ); + const avro_object = this.avroFormat( delta, bucket, doc_id, ts ); + const avro_buffer = this.avroEncode( avro_object ); if ( !this._conn ) { - reject( 'Error sending message: No connection' ); + reject( context ( + new AmqpError( 'Error sending message: No connection' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.ts, + }, + ) ); return; } else if ( !this._channel ) { - reject( 'Error sending message: No channel' ); + reject( context ( + new AmqpError( 'Error sending message: No channel' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.ts, + }, + ) ); return; } else if ( !avro_buffer ) { - reject( 'Error sending message: No avro buffer' ); + reject( context ( + new Error( 'Error sending message: No avro buffer' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.ts, + }, + ) ); return; } @@ -290,11 +260,61 @@ export class DeltaPublisher implements AmqpPublisher return; } - reject( 'Error sending message: publishing failed' ); + reject( context( + new Error ( 'Error sending message: publishing failed' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.ts, + } + ) ); } ); } + avroFormat( + delta: DeltaResult, + _bucket: Record, + doc_id: DocumentId, + ts: UnixTimestamp, + ): any + { + const delta_data = this.setDataTypes( delta.data ); + const event_id = this.DELTA_MAP[ delta.type ]; + + return { + event: { + id: event_id, + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: doc_id + }, + session: { + entity_name: 'Foobar', // Fix + entity_id: 123123, // Fix + }, + data: { + Data: { + bucket: _bucket, + }, + }, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', // Fix + }, + }, + } + } + /** * Encode the data in an avro buffer * @@ -339,7 +359,7 @@ export class DeltaPublisher implements AmqpPublisher * * @return the formatted data */ - avroFormat( data: any, top_level: boolean = true ): any + setDataTypes( data: any, top_level: boolean = true ): any { let data_formatted: any = {}; @@ -356,7 +376,7 @@ export class DeltaPublisher implements AmqpPublisher data.forEach( ( datum ) => { - arr.push( this.avroFormat( datum, false ) ); + arr.push( this.setDataTypes( datum, false ) ); } ); data_formatted = ( top_level ) @@ -369,7 +389,7 @@ export class DeltaPublisher implements AmqpPublisher Object.keys( data).forEach( ( key: string ) => { - const datum = this.avroFormat( data[ key ], false ); + const datum = this.setDataTypes( data[ key ], false ); datum_formatted[ key ] = datum; diff --git a/src/system/EventLogger.ts b/src/system/EventLogger.ts index 05e6452..ef46aca 100644 --- a/src/system/EventLogger.ts +++ b/src/system/EventLogger.ts @@ -72,7 +72,7 @@ export class EventLogger this._registerEvent( 'amqp-reconnect', LogLevel.WARNING ); this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR ); this._registerEvent( 'avro-err', LogLevel.ERROR ); - this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + this._registerEvent( 'dao-err', LogLevel.ERROR ); this._registerEvent( 'publish-err', LogLevel.ERROR ); // this._registerEvent( 'log', LogLevel.INFO ); diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index c70dbe3..30c4ea2 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -43,6 +43,11 @@ export declare type PrometheusConfig = { } +export type MetricTimer = ( + _start_time?: [ number, number ] +) => [ number, number ]; + + export class MetricsCollector { /** The prometheus PushGateway */ @@ -70,6 +75,10 @@ export class MetricsCollector private _total_processed_help: string = 'Total deltas successfully processed'; + /** Timing map */ + private _timing_map: Record = {}; + + /** * Initialize delta logger * @@ -81,6 +90,7 @@ export class MetricsCollector private readonly _factory: PrometheusFactory, private readonly _conf: PrometheusConfig, private readonly _emitter: EventEmitter, + private readonly _timer: MetricTimer, ) { // Set labels client.register.setDefaultLabels( { @@ -143,10 +153,22 @@ export class MetricsCollector private hookMetrics() { this._emitter.on( - 'delta-process-complete', - ( val: any ) => + 'delta-process-start', + ( uid: string ) => { - this._process_time.observe( val ); + this._timing_map[ uid ] = this._timer(); + } + ); + + this._emitter.on( + 'delta-process-end', + ( uid: string ) => + { + const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ]; + const t = this._timer( start_time_ms ); + const total_time_ms = ( ( t[ 0 ] * 1000 ) + ( t[ 1 ] / 1000 ) ); + + this._process_time.observe( total_time_ms ); this._total_processed.inc(); } ); @@ -176,7 +198,6 @@ export class MetricsCollector } - /** * Look for mongodb delta errors and update metrics if found * @@ -185,8 +206,8 @@ export class MetricsCollector checkForErrors( dao: DeltaDao ): NullableError { dao.getErrorCount() - .then( count => { this._current_error.set( +count ); } ) - .catch( err => { return err; } ); + .then( count => { this._current_error.set( +count ); } ) + .catch( err => { return err; } ); return null; } diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index 5ad09fc..d7d2544 100644 --- a/src/system/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -52,7 +52,7 @@ export interface DeltaDao advanceDeltaIndex( doc_id: DocumentId, type: string, - ): Promise + ): Promise /** @@ -67,7 +67,7 @@ export interface DeltaDao markDocumentAsProcessed( doc_id: DocumentId, last_update_ts: UnixTimestamp, - ): Promise + ): Promise /** @@ -77,7 +77,7 @@ export interface DeltaDao * * @return any errors that occurred */ - setErrorFlag( doc_id: DocumentId ): Promise + setErrorFlag( doc_id: DocumentId ): Promise /** diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index 2dfa9f0..4ab4e91 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -25,10 +25,8 @@ import { DocumentId } from '../../document/Document'; import { DeltaDao } from './DeltaDao'; import { MongoCollection } from 'mongodb'; import { context } from '../../error/ContextError'; -import { MongoError } from './MongoError'; - -export type MongoDeltaType = 'ratedata' | 'data'; - +import { DaoError } from '../../error/DaoError'; +import { DeltaType } from '../../bucket/delta'; /** Manage deltas */ export class MongoDeltaDao implements DeltaDao @@ -60,14 +58,17 @@ export class MongoDeltaDao implements DeltaDao return new Promise( ( resolve, reject ) => { this._collection.find( - { published: false }, + { + published: false, + deltaError: false, + }, {}, ( e, cursor ) => { if ( e ) { reject( - new MongoError( + new DaoError( 'Error fetching unprocessed documents: ' + e ) ); @@ -79,7 +80,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( - new MongoError( + new DaoError( 'Error fetching array from cursor: ' + e ) ); @@ -104,8 +105,8 @@ export class MongoDeltaDao implements DeltaDao */ advanceDeltaIndex( doc_id: DocumentId, - type: MongoDeltaType, - ): Promise + type: DeltaType, + ): Promise { return new Promise( ( resolve, reject ) => { @@ -122,7 +123,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( context( - new MongoError( + new DaoError( 'Error advancing delta index: ' + e ), { @@ -154,7 +155,7 @@ export class MongoDeltaDao implements DeltaDao markDocumentAsProcessed( doc_id: DocumentId, last_update_ts: UnixTimestamp, - ): Promise + ): Promise { return new Promise( ( resolve, reject ) => { @@ -167,7 +168,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( context( - new MongoError( + new DaoError( 'Error marking document as processed: ' + e ), { @@ -193,7 +194,7 @@ export class MongoDeltaDao implements DeltaDao * * @return any errors that occurred */ - setErrorFlag( doc_id: DocumentId ): Promise + setErrorFlag( doc_id: DocumentId ): Promise { return new Promise( ( resolve, reject ) => { @@ -206,7 +207,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( context( - new MongoError( + new DaoError( 'Failed setting error flag: ' + e ), { @@ -253,7 +254,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( context( - new MongoError( + new DaoError( 'Failed getting error count: ' + e ), { diff --git a/test/bucket/delta.ts b/test/bucket/delta.ts index ba1d192..cc9a790 100644 --- a/test/bucket/delta.ts +++ b/test/bucket/delta.ts @@ -19,12 +19,18 @@ * along with this program. If not, see . * */ -import { createDelta as sut, Kv , DeltaResult} from "../../src/bucket/delta"; +import { + createDelta as sutCreate, + applyDelta as sutApply, + Kv, + DeltaResult, +} from "../../src/bucket/delta"; import { expect, use as chai_use } from 'chai'; chai_use( require( 'chai-as-promised' ) ); -interface SutTestCase + +interface SutCreateTestCase { label: string; src_data: T; @@ -32,68 +38,149 @@ interface SutTestCase expected: DeltaResult; } + +interface SutApplyTestCase +{ + label: string; + bucket: T; + delta: DeltaResult; + expected: T; +} + + describe( 'Delta', () => { - ( >[]>[ - { - label: "No changes are made, key is dropped", - src_data: { foo: [ 'bar', 'baz' ] }, - dest_data: { foo: [ 'bar', 'baz' ] }, - expected: {}, - }, - { - label: "Only the unchanged key is dropped", - src_data: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] }, - dest_data: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] }, - expected: { bar: [ 'asd' ] }, - }, - { - label: "Changed values are updated by index with old value", - src_data: { foo: [ "bar", "baz", "quux" ] }, - dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, - expected: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] }, - }, - { - label: "The keys are null when they don't exist in first set", - src_data: {}, - dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, - expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, - }, - { - label: "Removed keys in new set show up", - src_data: { foo: [ "bar" ] }, - dest_data: {}, - expected: { foo: null }, - }, - { - label: "Indexes after a null terminator aren't included", - src_data: { foo: [ "one", "two", "three", "four" ] }, - dest_data: { foo: [ "one", "done" ] }, - expected: { foo: [ undefined, "done", null ] }, - }, - { - label: "Consider nested arrays to be scalar values", - src_data: { foo: [ [ "one" ], [ "two", "three" ] ] }, - dest_data: { foo: [ [ "one" ], [ "two" ] ] }, - expected: { foo: [ undefined, [ "two" ] ] }, - }, - { - label: "Don't evaluate zeros as falsy", - src_data: { foo: [ 0 ] }, - dest_data: { foo: [ 0 ] }, - expected: {}, - }, - { - label: "Don't evaluate empty strings as falsy", - src_data: { foo: [ '' ] }, - dest_data: { foo: [ '' ] }, - expected: {}, - }, - ] ).forEach( ( { label, src_data, dest_data, expected } ) => + describe( '#createDelta', () => { - it( label, () => + ( >[]>[ + { + label: "No changes are made, key is dropped", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', 'baz' ] }, + expected: {}, + }, + { + label: "Only the unchanged key is dropped", + src_data: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] }, + dest_data: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] }, + expected: { bar: [ 'asd' ] }, + }, + { + label: "Changed values are updated by index with old value", + src_data: { foo: [ "bar", "baz", "quux" ] }, + dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + expected: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] }, + }, + { + label: "The keys are null when they don't exist in first set", + src_data: {}, + dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + }, + { + label: "Removed keys in new set show up", + src_data: { foo: [ "bar" ] }, + dest_data: {}, + expected: { foo: null }, + }, + { + label: "Indexes after a null terminator aren't included", + src_data: { foo: [ "one", "two", "three", "four" ] }, + dest_data: { foo: [ "one", "done" ] }, + expected: { foo: [ undefined, "done", null ] }, + }, + { + label: "Consider nested arrays to be scalar values", + src_data: { foo: [ [ "one" ], [ "two", "three" ] ] }, + dest_data: { foo: [ [ "one" ], [ "two" ] ] }, + expected: { foo: [ undefined, [ "two" ] ] }, + }, + { + label: "Don't evaluate zeros as falsy", + src_data: { foo: [ 0 ] }, + dest_data: { foo: [ 0 ] }, + expected: {}, + }, + { + label: "Don't evaluate empty strings as falsy", + src_data: { foo: [ '' ] }, + dest_data: { foo: [ '' ] }, + expected: {}, + }, + ] ).forEach( ( { label, src_data, dest_data, expected } ) => { - expect( sut( src_data, dest_data ) ).to.deep.equal( expected ); + it( label, () => + { + expect( sutCreate( src_data, dest_data ) ) + .to.deep.equal( expected ); + } ); + } ); + } ); + + + describe( '#applyDelta', () => + { + ( >[]>[ + { + label: "Empty delta changes nothing", + bucket: { foo: [ 'bar', 'baz' ] }, + delta: {}, + expected: { foo: [ 'bar', 'baz' ] }, + }, + { + label: "Field not in delta is unchanged", + bucket: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] }, + delta: { bar: [ 'asd' ] }, + expected: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] }, + }, + { + label: "Undefined doesn't affect its corresponding index", + bucket: { foo: [ "bar", "baz", "quux" ] }, + delta: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] }, + expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + }, + { + label: "Delta applys correctly on empty bucket", + bucket: {}, + delta: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + }, + { + label: "Keys are removed properly", + bucket: { foo: [ "bar" ] }, + delta: { foo: null }, + expected: {}, + }, + { + label: "Indexes after a null terminator aren't included", + bucket: { foo: [ "one", "two", "three", "four" ] }, + delta: { foo: [ undefined, "done", null ] }, + expected: { foo: [ "one", "done" ] }, + }, + { + label: "Consider nested arrays to be scalar values", + bucket: { foo: [ [ "one" ], [ "two", "three" ] ] }, + delta: { foo: [ undefined, [ "two" ] ] }, + expected: { foo: [ [ "one" ], [ "two" ] ] }, + }, + { + label: "Don't evaluate zeros as falsy", + bucket: { foo: [ 0 ] }, + delta: {}, + expected: { foo: [ 0 ] }, + }, + { + label: "Don't evaluate empty strings as falsy", + bucket: { foo: [ '' ] }, + delta: {}, + expected: { foo: [ '' ] }, + }, + ] ).forEach( ( { label, bucket, delta, expected } ) => + { + it( label, () => + { + expect( sutApply( bucket, delta ) ).to.deep.equal( expected ); + } ); } ); } ); } ); diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index e1db8d2..d0cd8f8 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -22,7 +22,7 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; -import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao'; +import { DeltaType } from "../../src/bucket/delta"; import { EventEmitter } from 'events'; import { expect, use as chai_use } from 'chai'; @@ -40,21 +40,21 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, }, ], ratedata: [ { - data: { foo: 'third_bar' }, + data: { foo: [ 'third_bar' ] }, timestamp: 345, }, { - data: { foo: 'fourth_bar' }, + data: { foo: [ 'fourth_bar' ] }, timestamp: 456, }, ] @@ -62,22 +62,22 @@ describe( 'system.DeltaProcessor', () => }, expected: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, type: 'data', }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, type: 'data', }, { - data: { foo: 'third_bar' }, + data: { foo: [ 'third_bar' ] }, timestamp: 345, type: 'ratedata', }, { - data: { foo: 'fourth_bar' }, + data: { foo: [ 'fourth_bar' ] }, timestamp: 456, type: 'ratedata', }, @@ -89,11 +89,11 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, }, ], @@ -102,12 +102,12 @@ describe( 'system.DeltaProcessor', () => }, expected: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, type: 'data', }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, type: 'data', }, @@ -119,21 +119,21 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, }, { - data: { foo: 'fourth_bar' }, + data: { foo: [ 'fourth_bar' ] }, timestamp: 456, }, ], ratedata: [ { - data: { foo: 'third_bar' }, + data: { foo: [ 'third_bar' ] }, timestamp: 345, }, ], @@ -141,22 +141,22 @@ describe( 'system.DeltaProcessor', () => }, expected: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, type: 'data', }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, type: 'data', }, { - data: { foo: 'third_bar' }, + data: { foo: [ 'third_bar' ] }, timestamp: 345, type: 'ratedata', }, { - data: { foo: 'fourth_bar' }, + data: { foo: [ 'fourth_bar' ] }, timestamp: 456, type: 'data', }, @@ -181,7 +181,7 @@ describe( 'system.DeltaProcessor', () => { ( <{ label: string, - type: MongoDeltaType, + type: DeltaType, given: any, expected: any }[]>[ @@ -200,11 +200,11 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, }, ], @@ -212,12 +212,12 @@ describe( 'system.DeltaProcessor', () => }, expected: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, type: 'data', }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, type: 'data', }, @@ -230,11 +230,11 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, }, ], @@ -245,12 +245,12 @@ describe( 'system.DeltaProcessor', () => }, expected: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, type: 'data', }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, type: 'data', }, @@ -263,11 +263,11 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: 'first_bar' }, + data: { foo: [ 'first_bar' ] }, timestamp: 123, }, { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, }, ], @@ -278,7 +278,7 @@ describe( 'system.DeltaProcessor', () => }, expected: [ { - data: { foo: 'second_bar' }, + data: { foo: [ 'second_bar' ] }, timestamp: 234, type: 'data', }, @@ -297,6 +297,188 @@ describe( 'system.DeltaProcessor', () => expect( actual ).to.deep.equal( expected ); } ) ); } ); + + + describe( '#process', () => + { + ( <{ + label: string, + given: any[], + expected: any + }[]>[ + { + label: 'No deltas are processed', + docs: [ + { + id: 123, + lastUpdate: 123123123, + bucket: {}, + rdelta: {}, + }, + ], + expected: [], + }, + { + label: 'Publishes deltas in order', + given: [ + { + id: 123, + lastUpdate: 123123123, + bucket: { foo: [ 'start_bar' ] }, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + }, + { + data: { foo: [ 'second_bar' ] }, + timestamp: 234123, + }, + ], + }, + }, + ], + expected: [ + { + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + doc_id: 123, + }, + { + delta: { foo: [ 'second_bar' ] }, + bucket: { foo: [ 'second_bar' ] }, + doc_id: 123, + }, + ], + }, + { + label: 'Publishes deltas in order for multiple documents', + given: [ + { + id: 123, + lastUpdate: 123123123, + bucket: { foo: 'start_bar' }, + rdelta: { + data: [ + { + data: { foo: [ 'second_bar' ] }, + timestamp: 234, + }, + ], + ratedata: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123, + }, + ], + }, + }, + { + id: 234, + lastUpdate: 123123123, + bucket: { foo: 'start_bar' }, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123, + }, + { + data: { foo: [ 'second_bar' ] }, + timestamp: 234, + }, + { + data: { foo: [ 'third_bar' ] }, + timestamp: 345, + }, + ], + }, + }, + { + id: 345, + lastUpdate: 123123123, + bucket: { foo: 'start_bar' }, + rdelta: { + ratedata: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123, + }, + { + data: { foo: [ 'second_bar' ] }, + timestamp: 234, + }, + ], + }, + }, + ], + expected: [ + { + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + doc_id: 123, + }, + { + delta: { foo: [ 'second_bar' ] }, + bucket: { foo: [ 'second_bar' ] }, + doc_id: 123, + }, + { + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + doc_id: 234, + }, + { + delta: { foo: [ 'second_bar' ] }, + bucket: { foo: [ 'second_bar' ] }, + doc_id: 234, + }, + { + delta: { foo: [ 'third_bar' ] }, + bucket: { foo: [ 'third_bar' ] }, + doc_id: 234, + }, + { + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + doc_id: 345, + }, + { + delta: { foo: [ 'second_bar' ] }, + bucket: { foo: [ 'second_bar' ] }, + doc_id: 345, + }, + ], + }, + ] ).forEach( ( { given, expected, label } ) => it( label, () => + { + let published: any = []; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + + dao.getUnprocessedDocuments = (): Promise[]> => + { + return Promise.resolve( given ); + } + + publisher.publish = ( delta, bucket, doc_id ): Promise => + { + published.push( { + delta: delta.data, + bucket: bucket, + doc_id: doc_id, + } ); + + return Promise.resolve(); + } + + return expect( new Sut( dao, publisher, emitter ).process() ) + .to.eventually.deep.equal( undefined ) + .then( _ => expect( published ).to.deep.equal( expected ) ); + } ) ); + } ); } ); @@ -304,9 +486,9 @@ function createMockDeltaDao(): DeltaDao { return { getUnprocessedDocuments() { return Promise.resolve( [] ); }, - advanceDeltaIndex() { return Promise.resolve( null ); }, - markDocumentAsProcessed() { return Promise.resolve( null ); }, - setErrorFlag() { return Promise.resolve( null ); }, + advanceDeltaIndex() { return Promise.resolve(); }, + markDocumentAsProcessed() { return Promise.resolve(); }, + setErrorFlag() { return Promise.resolve(); }, getErrorCount() { return Promise.resolve( 0 ); }, }; } @@ -315,6 +497,6 @@ function createMockDeltaDao(): DeltaDao function createMockDeltaPublisher(): AmqpPublisher { return { - publish() { return Promise.resolve( null ); }, + publish() { return Promise.resolve(); }, }; } diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index cf3872f..40663d0 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -168,7 +168,7 @@ describe( 'server.DeltaPublisher', () => } ); - describe( '#avroFormat formats', () => + describe( '#setDataTypes annotates', () => { [ { @@ -304,7 +304,7 @@ describe( 'server.DeltaPublisher', () => const emitter = {} const conf = createMockConf(); const sut = new Sut( conf, emitter, ts_ctr ); - const actual = sut.avroFormat( delta_data ); + const actual = sut.setDataTypes( delta_data ); expect( actual ).to.deep.equal( expected ); } ); diff --git a/test/system/EventLoggerTest.ts b/test/system/EventLoggerTest.ts new file mode 100644 index 0000000..b3d5f0f --- /dev/null +++ b/test/system/EventLoggerTest.ts @@ -0,0 +1,103 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { EventLogger as Sut } from '../../src/system/EventLogger'; +import { EventEmitter } from "events"; +import { expect } from 'chai'; + +const sinon = require( 'sinon' ); + +declare interface MockConsole extends Console { + getLevel(): string, +} + +describe( 'system.EventLogger captures and logs events', () => +{ + [ + { + event_id: 'document-processed', + console_level: 'log', + }, + { + event_id: 'delta-publish', + console_level: 'log', + }, + { + event_id: 'amqp-conn-error', + console_level: 'warn', + }, + { + event_id: 'amqp-reconnect', + console_level: 'warn', + }, + { + event_id: 'amqp-reconnect-fail', + console_level: 'error', + }, + { + event_id: 'avro-err', + console_level: 'error', + }, + { + event_id: 'dao-err', + console_level: 'error', + }, + { + event_id: 'publish-err', + console_level: 'error', + }, + ].forEach( ( { event_id, console_level } ) => + { + it( event_id + ' triggers console output level: ' + console_level, () => + { + const emitter = new EventEmitter(); + const con = createMockConsole(); + const env = 'test'; + + new Sut( con, env, emitter, ts_ctr ); + + emitter.emit( event_id ); + + expect( con.getLevel() ).to.equal( console_level ); + } ); + } ); +} ); + + +function ts_ctr(): UnixTimestamp +{ + return Math.floor( new Date().getTime() / 1000 ); +} + + +function createMockConsole(): MockConsole +{ + const mock = sinon.mock( console ); + + mock.level = ''; + mock.info = ( _str: string ) => { mock.level = 'info'; }; + mock.log = ( _str: string ) => { mock.level = 'log'; }; + mock.warn = ( _str: string ) => { mock.level = 'warn'; }; + mock.error = ( _str: string ) => { mock.level = 'error'; }; + mock.getLevel = () => mock.level; + + return mock; +} \ No newline at end of file diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts new file mode 100644 index 0000000..07867a8 --- /dev/null +++ b/test/system/MetricsCollectorTest.ts @@ -0,0 +1,157 @@ +/** + * Metrics collector test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { PrometheusFactory } from '../../src/system/PrometheusFactory'; +import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; +import { EventEmitter } from 'events'; +import { expect } from 'chai'; +import { + MetricsCollector as Sut, + PrometheusConfig, + MetricTimer, +} from '../../src/system/MetricsCollector'; + +const sinon = require( 'sinon' ); + +describe( 'system.MetricsCollector captures events and pushes metrics', () => +{ + it( 'process-complete event is hooked', () => + { + let histogram_called = false; + let counter_called = false; + + const emitter = new EventEmitter(); + const conf = createMockConfig(); + const timer = createMockTimer(); + const factory = createMockFactory( { + histogram_cb: () => { histogram_called = true }, + counter_cb: () => { counter_called = true }, + } ); + + new Sut( factory, conf, emitter, timer ); + + emitter.emit( 'delta-process-end' ); + + expect( histogram_called ).to.be.true; + expect( counter_called ).to.be.true; + } ); + + + it( 'process-error event is hooked', () => + { + let counter_called = false; + + const emitter = new EventEmitter(); + const conf = createMockConfig(); + const timer = createMockTimer(); + const factory = createMockFactory( { + counter_cb: () => { counter_called = true }, + } ); + + new Sut( factory, conf, emitter, timer ); + + emitter.emit( 'delta-process-error' ); + + expect( counter_called ).to.be.true; + } ); + + + it( 'process-complete is timed properly', () => + { + let actual_ms = 0; + const uid = 'foo'; + const start_time_ns = 1234; + const end_time_ns = 5678; + const expected_ms = ( end_time_ns - start_time_ns ) / 1000; + const emitter = new EventEmitter(); + const conf = createMockConfig(); + const timer = createMockTimer( start_time_ns, end_time_ns ); + const factory = createMockFactory( { + histogram_cb: ( n: number ) => { actual_ms = n }, + } ); + + new Sut( factory, conf, emitter, timer ); + + emitter.emit( 'delta-process-start', uid ); + emitter.emit( 'delta-process-end', uid ); + + expect( actual_ms ).to.be.equal( expected_ms ); + } ); +} ); + + +function createMockFactory( + { + gateway_cb = () => {}, + counter_cb = () => {}, + histogram_cb = ( _n: number = 0 ) => {}, + gauge_cb = ( _n: number = 0 ) => {}, + }: + { + gateway_cb ?: () => void; + counter_cb ?: () => void; + histogram_cb ?: ( _n: number ) => void; + gauge_cb ?: ( _n: number ) => void; + } +): PrometheusFactory +{ + const gateway = sinon.mock( Pushgateway ); + const counter = sinon.mock( Counter ); + const histogram = sinon.mock( Histogram ); + const gauge = sinon.mock( Gauge ); + + gateway.pushAdd = gateway_cb; + counter.inc = counter_cb; + histogram.observe = histogram_cb; + gauge.set = gauge_cb; + + return { + createGateway() { return gateway }, + createCounter() { return counter }, + createHistogram(){ return histogram }, + createGauge() { return gauge }, + }; +} + + +function createMockConfig(): PrometheusConfig +{ + return { + hostname: 'foo.com', + port: 123, + env: 'test', + push_interval_ms: 1000, + } +} + + +function createMockTimer( _start: number = 0, _end: number = 0 ): MetricTimer +{ + return ( _start_time?: [ number, number ] ) => + { + if ( !_start_time ) + { + return [ 0, _start ]; + } + + return [ 0, _end - _start_time[ 1 ] ]; + }; +} \ No newline at end of file