From d9ee999adbfbd22ed7a9f67766d0ce243c30bd90 Mon Sep 17 00:00:00 2001 From: Austin Schaffer Date: Mon, 2 Dec 2019 10:00:23 -0500 Subject: [PATCH] [DEV-5312] Refactor class dependencies Move create config functions out of the main delta-processor.ts file. Change some any types to more specific types. Define document structure better. Move prometheus logic out of processor class. Do not call logger directly, listen for events and log them. Change logger to more PSR-3 compliant style. Extract amqp connection into its own class. Create avro and mongo objects in factory. --- .env | 8 +- bin/delta-processor.ts | 303 +++++---------------- src/bucket/delta.ts | 56 +++- src/server/Server.js | 2 - src/server/db/MongoServerDao.ts | 27 +- src/system/AmqpPublisher.ts | 42 ++- src/system/DeltaProcessor.ts | 87 +++--- src/system/DeltaPublisher.ts | 389 +++++++++++---------------- src/system/EventLogger.ts | 151 ----------- src/system/EventMediator.ts | 88 ++++++ src/system/MetricsCollector.ts | 58 ++-- src/system/PrometheusFactory.ts | 36 +++ src/system/PsrLogger.ts | 117 ++++++++ src/system/StandardLogger.ts | 198 ++++++++++++++ src/system/amqp/AmqpConnection.ts | 154 +++++++++++ src/system/avro/AvroFactory.ts | 90 +++++++ src/system/avro/schema.avsc | 25 +- src/system/db/DeltaDao.ts | 7 +- src/system/db/MongoDeltaDao.ts | 33 +-- src/system/db/MongoFactory.ts | 176 ++++++++++++ src/types/mongodb.d.ts | 19 ++ test/server/db/MongoServerDaoTest.js | 157 ----------- test/server/db/MongoServerDaoTest.ts | 7 +- test/system/DeltaProcessorTest.ts | 312 +++++++++++++++++---- test/system/DeltaPublisherTest.ts | 221 +++++++++++---- test/system/EventLoggerTest.ts | 103 ------- test/system/EventMediatorTest.ts | 139 ++++++++++ test/system/MetricsCollectorTest.ts | 26 +- test/system/StandardLoggerTest.ts | 178 ++++++++++++ 29 files changed, 2077 insertions(+), 1132 deletions(-) delete mode 100644 src/system/EventLogger.ts create mode 100644 src/system/EventMediator.ts create mode 100644 src/system/PsrLogger.ts create mode 100644 src/system/StandardLogger.ts create mode 100644 src/system/amqp/AmqpConnection.ts create mode 100644 src/system/avro/AvroFactory.ts create mode 100644 src/system/db/MongoFactory.ts delete mode 100644 test/server/db/MongoServerDaoTest.js delete mode 100644 test/system/EventLoggerTest.ts create mode 100644 test/system/EventMediatorTest.ts create mode 100644 test/system/StandardLoggerTest.ts diff --git a/.env b/.env index acb0b79..0bf93e2 100644 --- a/.env +++ b/.env @@ -1,15 +1,15 @@ NODE_ENV=dev amqp_hostname=localhost amqp_port=5672 -amqp_username=quote_referral +amqp_username= amqp_password= amqp_frameMax=0 amqp_heartbeat=2 -amqp_vhost=quote -amqp_exchange=quoteupdate +amqp_vhost= +amqp_exchange= amqp_retries=30 amqp_retry_wait=1 -prom_hostname=dmz2docker01.rsgcorp.local +prom_hostname= prom_port=9091 prom_push_interval_ms=5000 process_interval_ms=2000 diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 40842de..522b98b 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -18,34 +18,43 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -import fs = require( 'fs' ); - -import { AmqpConfig } from '../src/system/AmqpPublisher'; +import { createAmqpConfig } from '../src/system/AmqpPublisher'; import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; import { DeltaProcessor } from '../src/system/DeltaProcessor'; import { DeltaPublisher } from '../src/system/DeltaPublisher'; -import { MongoDb, MongoDbConfig, MongoCollection } from '../src/types/mongodb'; -import { EventLogger } from '../src/system/EventLogger'; -import { EventEmitter } from 'events'; -import { PrometheusFactory } from '../src/system/PrometheusFactory'; +import { MongoCollection } from '../src/types/mongodb'; +import { createAvroEncoder } from '../src/system/avro/AvroFactory'; import { - MetricsCollector, - PrometheusConfig, -} from '../src/system/MetricsCollector'; + createMongoConfig, + createMongoDB, + getMongoCollection, +} from '../src/system/db/MongoFactory'; +import { EventMediator } from '../src/system/EventMediator'; +import { EventEmitter } from 'events'; +import { StandardLogger } from '../src/system/StandardLogger'; +import { MetricsCollector } from '../src/system/MetricsCollector'; +import { + PrometheusFactory, + createPrometheusConfig, +} from '../src/system/PrometheusFactory'; +import { AmqpConnection } from '../src/system/amqp/AmqpConnection'; -const { - Db: MongoDb, - Server: MongoServer, - ReplServers: ReplSetServers, -} = require( 'mongodb' ); -const amqp_conf = _getAmqpConfig( process.env ); -const db_conf = _getMongoConfig( process.env ); -const prom_conf = _getPrometheusConfig( process.env ); +const amqp_conf = createAmqpConfig( process.env ); +const prom_conf = createPrometheusConfig( process.env ); +const db_conf = createMongoConfig( process.env ); +const db = createMongoDB( db_conf ); +const process_interval_ms = +( process.env.process_interval_ms || 2000 ); const env = process.env.NODE_ENV || 'Unknown Environment'; -const process_interval_ms = +(process.env.process_interval_ms || 2000); const emitter = new EventEmitter(); -const db = _createDB( db_conf ); +const log = new StandardLogger( console, ts_ctr, env ); +const amqp_connection = new AmqpConnection( amqp_conf, emitter ); +const publisher = new DeltaPublisher( + emitter, + ts_ctr, + createAvroEncoder, + amqp_connection, +); // Prometheus Metrics const prom_factory = new PrometheusFactory(); @@ -57,67 +66,47 @@ const metrics = new MetricsCollector( ); // Structured logging -new EventLogger( console, env, emitter, ts_ctr ); +new EventMediator( log, emitter ); let process_interval: NodeJS.Timer; let dao: MongoDeltaDao; -let publisher: DeltaPublisher; -let processor: DeltaProcessor; -_getMongoCollection( db, db_conf ) +getMongoCollection( db, db_conf ) .then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } ) - .then( ( mongoDao: MongoDeltaDao ) => - { - dao = mongoDao; - publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); - processor = new DeltaProcessor( mongoDao, publisher, emitter ); - } ) - .then( _ => publisher.connect() ) + .then( ( mongoDao: MongoDeltaDao ) => { dao = mongoDao; } ) + .then( _ => amqp_connection.connect() ) .then( _ => - { - const pidPath = __dirname + '/../conf/.delta_processor.pid'; + { + log.info( 'Liza Delta Processor' ); - writePidFile(pidPath ); - greet( 'Liza Delta Processor', pidPath ); + handleShutdown(); - process_interval = setInterval( () => + const processor = new DeltaProcessor( dao, publisher, emitter ); + + process_interval = setInterval( () => { processor.process(); - metrics.checkForErrors( dao ); + + dao.getErrorCount() + .then( count => { metrics.updateErrorCount( count ) } ); }, process_interval_ms, ); } ) - .catch( e => { console.error( 'Error: ' + e ); } ); + .catch( e => + { + log.error( e ); + process.exit( 1 ); + } ); /** - * Output greeting - * - * The greeting contains the program name and PID file path. - * - * @param name - program name - * @param pid_path - path to PID file + * Hook shutdown events */ -function greet( name: string, pid_path: string ): void +function handleShutdown(): void { - console.log( `${name}`); - console.log( `PID file: ${pid_path}` ); -} - - -/** - * Write process id (PID) file - * - * @param pid_path - path to pid file - */ -function writePidFile( pid_path: string ): void -{ - fs.writeFileSync( pid_path, process.pid ); - process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } ) - .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } ) - .on( 'exit', () => { fs.unlink( pid_path, () => {} ); } ); + .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } ); } @@ -128,12 +117,12 @@ function writePidFile( pid_path: string ): void */ function shutdown( signal: string ): void { - console.log( 'Received ' + signal + '. Beginning graceful shutdown:' ); - console.log( '...Stopping processing interval' ); + log.info( 'Received ' + signal + '. Beginning graceful shutdown:' ); + log.info( '...Stopping processing interval' ); clearInterval( process_interval ); - console.log( '...Closing MongoDb connection' ); + log.info( '...Closing MongoDb connection' ); db.close( ( err, _data ) => { @@ -143,11 +132,15 @@ function shutdown( signal: string ): void } } ); - console.log( '...Closing AMQP connection...' ); + log.info( '...Closing AMQP connection...' ); - publisher.close(); + amqp_connection.close(); - console.log( 'Shutdown complete. Exiting.' ); + log.info( '...Stopping the metrics collector...' ); + + metrics.stop(); + + log.info( 'Shutdown complete. Exiting.' ); process.exit(); } @@ -161,179 +154,3 @@ function ts_ctr(): UnixTimestamp { return Math.floor( new Date().getTime() / 1000 ); } - - -/** - * Create the database connection - * - * @param conf - the configuration from the environment - * - * @return the mongodb connection - */ -function _createDB( conf: MongoDbConfig ): MongoDb -{ - if( conf.ha ) - { - var mongodbPort = conf.port || 27017; - var mongodbReplSet = conf.replset || 'rs0'; - var dbServers = new ReplSetServers( - [ - new MongoServer( conf.host_a, conf.port_a || mongodbPort), - new MongoServer( conf.host_b, conf.port_b || mongodbPort), - ], - {rs_name: mongodbReplSet, auto_reconnect: true} - ); - } - else - { - var dbServers = new MongoServer( - conf.host || '127.0.0.1', - conf.port || 27017, - {auto_reconnect: true} - ); - } - var db = new MongoDb( - 'program', - dbServers, - {native_parser: false, safe: false} - ); - return db; -} - - -/** - * Attempts to connect to the database - * - * connectError event will be emitted on failure. - * - * @return any errors that occurred - */ -function _getMongoCollection( - db: MongoDb, - conf: MongoDbConfig -): Promise -{ - return new Promise( ( resolve, reject ) => - { - // attempt to connect to the database - db.open( ( e: any, db: any ) => - { - // if there was an error, don't bother with anything else - if ( e ) - { - // in some circumstances, it may just be telling us that - // we're already connected (even though the connection may - // have been broken) - if ( e.errno !== undefined ) - { - reject( 'Error opening mongo connection: ' + e ); - return; - } - } else if ( db == null ) - { - reject( 'No database connection' ); - return; - } - - // quotes collection - db.collection( - conf.collection, - ( e: any, collection: MongoCollection ) => - { - if ( e ) - { - reject( 'Error creating collection: ' + e ); - return; - } - - // initialize indexes - collection.createIndex( - [ - ['published', 1], - ['deltaError', 1], - ], - true, - ( e: any, _index: { [P: string]: any } ) => - { - if ( e ) - { - reject( 'Error creating index: ' + e ); - return; - } - - resolve( collection ); - return; - } - ); - } - ); - } ); - } ); -} - - -/** - * Create a mongodb configuration from the environment - * - * @param env - the environment variables - * - * @return the mongo configuration - */ -function _getMongoConfig( env: any ): MongoDbConfig -{ - return { - 'port': +( env.MONGODB_PORT || 0 ), - 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1, - 'replset': env.LIZA_MONGODB_REPLSET, - 'host': env.MONGODB_HOST, - 'host_a': env.LIZA_MONGODB_HOST_A, - 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ), - 'host_b': env.LIZA_MONGODB_HOST_B, - 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ), - 'collection': 'quotes', - }; -} - - -/** - * Create an amqp configuration from the environment - * - * @param env - the environment variables - * - * @return the amqp configuration - */ -function _getAmqpConfig( env: any ): AmqpConfig -{ - return { - 'protocol': 'amqp', - 'hostname': env.amqp_hostname, - 'port': +( env.amqp_port || 0 ), - 'username': env.amqp_username, - 'password': env.amqp_password, - 'locale': 'en_US', - 'frameMax': env.amqp_frameMax, - 'heartbeat': env.amqp_heartbeat, - 'vhost': env.amqp_vhost, - 'exchange': env.amqp_exchange, - 'retries': env.amqp_retries || 30, - 'retry_wait': env.amqp_retry_wait || 1000, - }; -} - - -/** - * Create a prometheus configuration from the environment - * - * @param env - the environment variables - * - * @return the prometheus configuration - */ -function _getPrometheusConfig( env: any ): PrometheusConfig -{ - return { - 'hostname': env.prom_hostname, - 'port': +( env.prom_port || 0 ), - 'env': process.env.NODE_ENV, - 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ), - }; -} \ No newline at end of file diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index ef2ab77..b83a8e7 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -18,16 +18,21 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ +import { DocumentId } from '../document/Document'; + /** The data structure expected for a document's internal key/value store */ export type Kv = Record; + /** Possible delta values for Kv array indexes */ export type DeltaDatum = T | null | undefined; + /** Possible delta types */ export type DeltaType = 'ratedata' | 'data'; + /** * The constructor type for a delta generating function * @@ -46,6 +51,53 @@ export type DeltaConstructor = Kv, V extends Kv = export type DeltaResult = { [K in keyof T]: DeltaDatum | null }; +/** Complete delta type */ +export type Delta = { + type: DeltaType, + timestamp: UnixTimestamp, + data: DeltaResult, +} + + +/** Reverse delta type */ +export type ReverseDelta = { + data: Delta[], + ratedata: Delta[], +} + + +/** Structure for Published delta count */ +export type PublishDeltaCount = { + data?: number, + ratedata?: number, +} + + +/** + * Document structure + */ +export interface DeltaDocument +{ + /** The document id */ + id: DocumentId, + + /** The time the document was updated */ + lastUpdate: UnixTimestamp, + + /** The data bucket */ + data: Record, + + /** The rate data bucket */ + ratedata?: Record, + + /** The calculated reverse deltas */ + rdelta?: ReverseDelta, + + /** A count of how many of each delta type have been processed */ + totalPublishDelta?: PublishDeltaCount, +}; + + /** * Create delta to transform from src into dest * @@ -105,7 +157,7 @@ export function createDelta, V extends Kv>( * @param bucket - The bucket data * @param delta - The delta to apply * - * @return the delta + * @return the bucket with the delta applied */ export function applyDelta, V extends Kv>( bucket: U = {}, @@ -164,7 +216,7 @@ export function applyDelta, V extends Kv>( * @param bucket - The bucket data array * @param delta - The delta data array * - * @return an object with an changed flag and a data array + * @return the applied delta */ function _applyDeltaKey( bucket: T[], diff --git a/src/server/Server.js b/src/server/Server.js index 8651f9d..d839d7b 100644 --- a/src/server/Server.js +++ b/src/server/Server.js @@ -340,7 +340,6 @@ module.exports = Class( 'Server' ) .setImported( quote_data.importedInd || false ) .setBound( quote_data.boundInd || false ) .needsImport( quote_data.importDirty || false ) - .needsDeltaProcessing( quote_data.processed || true ) .setCurrentStepId( quote_data.currentStepId || quote_program.getFirstStepId() @@ -393,7 +392,6 @@ module.exports = Class( 'Server' ) importedInd: ( quote.isImported() ) ? 1 : 0, boundInd: ( quote.isBound() ) ? 1 : 0, importDirty: 0, - published: 1, syncInd: 0, boundInd: 0, notifyInd: 0, diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts index ef129b8..dd1df2d 100644 --- a/src/server/db/MongoServerDao.ts +++ b/src/server/db/MongoServerDao.ts @@ -20,7 +20,7 @@ */ import { ServerDao, Callback } from "./ServerDao"; -import { MongoCollection, MongoUpdate } from "mongodb"; +import { MongoCollection, MongoUpdate, MongoDb } from "mongodb"; import { PositiveInteger } from "../../numeric"; import { ServerSideQuote } from "../quote/ServerSideQuote"; import { QuoteId } from "../../document/Document"; @@ -64,7 +64,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao * @param {Mongo.Db} db mongo database connection */ constructor( - private readonly _db: any + private readonly _db: MongoDb ) { super(); @@ -86,7 +86,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao var dao = this; // map db error event (on connection error) to our connectError event - this._db.on( 'error', function( err: any ) + this._db.on( 'error', function( err: Error ) { dao._ready = false; dao._collection = null; @@ -165,7 +165,10 @@ export class MongoServerDao extends EventEmitter implements ServerDao collection.createIndex( [ ['id', 1] ], true, - function( _err: any, _index: { [P: string]: any } ) + function( + _err: NullableError, + _index: { [P: string]: any, + } ) { // mark the DAO as ready to be used dao._collection = collection; @@ -179,7 +182,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao db.collection( dao.COLLECTION_SEQ, function( - err: any, + err: Error, collection: MongoCollection, ) { if ( err ) @@ -199,7 +202,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao collection.find( { _id: dao.SEQ_QUOTE_ID }, { limit: 1 }, - function( err: any, cursor ) + function( err: NullableError, cursor ) { if ( err ) { @@ -207,7 +210,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao return; } - cursor.toArray( function( _err: any, data: any[] ) + cursor.toArray( function( _err: Error, data: any[] ) { if ( data.length == 0 ) { @@ -236,7 +239,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao _id: this.SEQ_QUOTE_ID, val: this.SEQ_QUOTE_ID_DEFAULT, }, - function( err: any, _docs: any ) + function( err: NullableError, _docs: any ) { if ( err ) { @@ -467,8 +470,8 @@ export class MongoServerDao extends EventEmitter implements ServerDao */ saveQuoteState( quote: ServerSideQuote, - success_callback: any, - failure_callback: any, + success_callback: Callback, + failure_callback: Callback, ) { var update = { @@ -486,8 +489,8 @@ export class MongoServerDao extends EventEmitter implements ServerDao saveQuoteClasses( quote: ServerSideQuote, classes: any, - success: any, - failure: any, + success: Callback, + failure: Callback, ) { return this.mergeData( diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index 12f521c..1f728b3 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -26,6 +26,32 @@ import { DocumentId } from '../document/Document'; import { Options } from 'amqplib'; +/** + * Create an amqp configuration from the environment + * + * @param env - the environment variables + * + * @return the amqp configuration + */ +export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig +{ + return { + protocol: 'amqp', + hostname: env.amqp_hostname, + port: +( env.amqp_port || 0 ), + username: env.amqp_username, + password: env.amqp_password, + locale: 'en_US', + frameMax: +( env.amqp_frameMax || 0 ), + heartbeat: +( env.amqp_heartbeat || 0 ), + vhost: env.amqp_vhost, + exchange: env.amqp_exchange, + retries: env.amqp_retries || 30, + retry_wait: env.amqp_retry_wait || 1000, + }; +} + + export interface AmqpConfig extends Options.Connect { /** The protocol to connect with (should always be "amqp") */ protocol: string; @@ -49,7 +75,7 @@ export interface AmqpConfig extends Options.Connect { frameMax: number; /** How often to check for a live connection */ - heartBeat: number; + heartbeat: number; /** The virtual host we are on (e.g. live, demo, test) */ vhost?: string; @@ -70,13 +96,15 @@ export interface AmqpPublisher /** * Publish quote message to exchange post-rating * - * @param delta - The delta - * @param bucket - The bucket - * @param doc_id - The doc_id + * @param doc_id - The doc_id + * @param delta - The delta + * @param bucket - The bucket + * @param ratedata - The rate data bucket */ publish( - delta: DeltaResult, - bucket: Record, - doc_id: DocumentId, + doc_id: DocumentId, + delta: DeltaResult, + bucket: Record, + ratedata?: Record, ): Promise } diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index d8da5cb..b136f80 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -20,10 +20,16 @@ */ import { DeltaDao } from "../system/db/DeltaDao"; -import { DeltaResult, DeltaType, applyDelta } from "../bucket/delta"; import { DocumentId } from "../document/Document"; import { AmqpPublisher } from "./AmqpPublisher"; import { EventEmitter } from "events"; +import { + DeltaType, + applyDelta, + DeltaDocument, + Delta, + ReverseDelta, +} from "../bucket/delta"; /** @@ -58,81 +64,84 @@ export class DeltaProcessor process(): Promise { return this._dao.getUnprocessedDocuments() - .then( docs => this._processNext( docs ) ) - .catch( err => { this._emitter.emit( 'dao-err', err ) } ); + .then( docs => this._processNext( docs ) ); } - private _processNext( docs: any ): Promise + private _processNext( docs: DeltaDocument[] ): Promise { - if ( docs.length === 0 ) + const doc = docs.shift(); + + if ( !doc ) { return Promise.resolve(); } - const doc = docs.shift(); - return this._processDocument( doc ) - .then( _ => this._processNext( docs ) ); + .then( _ => this._processNext( docs ) ) } - private _processDocument( doc: Record ): Promise + private _processDocument( doc: DeltaDocument ): Promise { - const deltas = this.getTimestampSortedDeltas( doc ); - const doc_id: DocumentId = doc.id; - const bucket = doc.data; - const last_updated_ts = doc.lastUpdate; + const deltas = this.getTimestampSortedDeltas( doc ); + const doc_id = doc.id; + const bucket = doc.data; + const ratedata = doc.ratedata; + const last_updated_ts = doc.lastUpdate; - return this._processNextDelta( deltas, bucket, doc_id ) + return this._processNextDelta( doc_id, deltas, bucket, ratedata ) .then( _ => this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) ) .then( _ => { - this._emitter.emit( - 'document-processed', - 'Deltas on document ' + doc_id + ' processed ' - + 'successfully. Document has been marked as ' - + 'completely processed.' - ); + this._emitter.emit( 'document-processed', { doc_id: doc_id } ); } ) .catch( e => { - this._emitter.emit( 'delta-err', e ); - this._dao.setErrorFlag( doc_id ); + this._emitter.emit( 'error', e ); + return this._dao.setErrorFlag( doc_id ); } ); } private _processNextDelta( - deltas: DeltaResult[], - bucket: Record, - doc_id: DocumentId, + doc_id: DocumentId, + deltas: Delta[], + bucket: Record, + ratedata?: Record, ): Promise { - if ( deltas.length === 0 ) - { - return Promise.resolve(); - } - const delta = deltas.shift(); if ( !delta ) { - return Promise.reject( new Error( 'Undefined delta' ) ); + return Promise.resolve(); } const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type; this._emitter.emit( 'delta-process-start', delta_uid ); - const new_bucket = applyDelta( bucket, delta.data ); + if ( delta.type == this.DELTA_DATA ) + { + bucket = applyDelta( bucket, delta.data ); + } + else + { + ratedata = applyDelta( ratedata, delta.data ); + } - return this._publisher.publish( delta, new_bucket, doc_id ) + return this._publisher.publish( doc_id, delta, bucket, ratedata ) .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) ) .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) - .then( _ => this._processNextDelta( deltas, new_bucket, doc_id ) ); + .then( _ => this._processNextDelta( + doc_id, + deltas, + bucket, + ratedata + ) ); } @@ -144,7 +153,7 @@ export class DeltaProcessor * * @return a list of deltas sorted by timestamp */ - getTimestampSortedDeltas( doc: any ): DeltaResult[] + getTimestampSortedDeltas( doc: DeltaDocument ): Delta[] { const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA ); const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA ); @@ -164,10 +173,10 @@ export class DeltaProcessor * * @return a trimmed list of deltas */ - getDeltas( doc: any, type: DeltaType ): DeltaResult[] + getDeltas( doc: DeltaDocument, type: DeltaType ): Delta[] { - const deltas_obj = doc.rdelta || {}; - const deltas: DeltaResult[] = deltas_obj[ type ] || []; + const deltas_obj = doc.rdelta || >{}; + const deltas: Delta[] = deltas_obj[ type ] || []; // Get type specific delta index let published_count = 0; @@ -197,7 +206,7 @@ export class DeltaProcessor * * @return a sort value */ - private _sortByTimestamp( a: DeltaResult, b: DeltaResult ): number + private _sortByTimestamp( a: Delta, b: Delta ): number { if ( a.timestamp < b.timestamp ) { diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index a5b68b8..94c925c 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -21,37 +21,22 @@ * Publish delta message to a queue */ -import { AmqpPublisher, AmqpConfig } from './AmqpPublisher'; -import { DeltaResult } from '../bucket/delta'; +import { AmqpPublisher } from './AmqpPublisher'; +import { Delta } from '../bucket/delta'; import { EventEmitter } from "events"; import { DocumentId } from '../document/Document'; import { context } from '../error/ContextError'; import { AmqpError } from '../error/AmqpError'; -import { - connect as amqpConnect, - Channel, - Connection, -} from 'amqplib'; +import { AvroSchema, AvroEncoderCtr } from './avro/AvroFactory'; +import { AmqpConnection } from './amqp/AmqpConnection'; + const avro = require( 'avro-js' ); - -export interface AvroSchema { - /** Write data to a buffer */ - toBuffer( data: Record ): Buffer | null; -} - - export class DeltaPublisher implements AmqpPublisher { - /** The amqp connection */ - private _conn?: Connection; - - /** The amqp channel */ - private _channel?: Channel; - /** The avro schema */ - private _type?: AvroSchema; + private _schema: AvroSchema; /** The path to the avro schema */ readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc'; @@ -66,122 +51,45 @@ export class DeltaPublisher implements AmqpPublisher /** * Delta publisher * - * @param _conf - amqp configuration - * @param _emitter - event emitter instance - * @param _ts_ctr - a timestamp constructor + * @param _emitter - event emitter instance + * @param _ts_ctr - a timestamp constructor + * @param _encoder_ctr - a factory function to create an avro encoder + * @param _conn - the amqp connection */ constructor( - private readonly _conf: AmqpConfig, - private readonly _emitter: EventEmitter, - private readonly _ts_ctr: () => UnixTimestamp, + private readonly _emitter: EventEmitter, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _encoder_ctr: AvroEncoderCtr, + private readonly _conn: AmqpConnection, ) { - this._type = avro.parse( this.SCHEMA_PATH ); - } - - - /** - * Initialize connection - */ - connect(): Promise - { - return amqpConnect( this._conf ) - .then( conn => - { - this._conn = conn; - - // If there is an error, attempt to reconnect - this._conn.on( 'error', e => - { - this._emitter.emit( 'amqp-conn-error', e ); - this._reconnect(); - } ); - - return this._conn.createChannel(); - } ) - .then( ( ch: Channel ) => - { - this._channel = ch; - - this._channel.assertExchange( - this._conf.exchange, - 'fanout', - { durable: true } - ); - } ); - } - - - /** - * Attempt to re-establish the connection - * - * @return Whether reconnecting was successful - */ - private _reconnect( retry_count: number = 0 ): void - { - if ( retry_count >= this._conf.retries ) - { - this._emitter.emit( - 'amqp-reconnect-fail', - 'Could not re-establish AMQP connection.' - ); - - return; - } - - this._emitter.emit( - 'amqp-reconnect', - '...attempting to re-establish AMQP connection' - ); - - this.connect() - .then( _ => - { - this._emitter.emit( - 'amqp-reconnect', - 'AMQP re-connected' - ); - } ) - .catch( _ => - { - const wait_ms = this._conf.retry_wait; - setTimeout( () => this._reconnect( ++retry_count ), wait_ms ); - } ); - } - - - /** - * Close the amqp conenction - */ - close(): void - { - if ( this._conn ) - { - this._conn.close.bind(this._conn); - } + this._schema = avro.parse( this.SCHEMA_PATH ); } /** * Publish quote message to exchange post-rating * - * @param delta - The delta - * @param bucket - The bucket - * @param doc_id - The doc_id + * @param doc_id - The doc_id + * @param delta - The delta + * @param bucket - The bucket + * @param ratedata - The ratedata bucket */ - publish( - delta: DeltaResult, - bucket: Record, - doc_id: DocumentId, + publish( + doc_id: DocumentId, + delta: Delta, + bucket: Record, + ratedata: Record = {}, ): Promise { - return this.sendMessage( delta, bucket, doc_id ) + return this._sendMessage( doc_id, delta, bucket, ratedata ) .then( _ => { this._emitter.emit( 'delta-publish', - "Published " + delta.type + " delta with ts '" - + delta.timestamp + "' to '" + this._conf.exchange - + '" exchange', + { + delta: delta, + exchange: this._conn.getExchangeName(), + } ); } ); } @@ -190,131 +98,155 @@ export class DeltaPublisher implements AmqpPublisher /** * Send message to exchange * - * @param delta - The delta to publish - * @param bucket - The bucket - * @param doc_id - The doc_id + * @param doc_id - The doc_id + * @param delta - The delta to publish + * @param bucket - The bucket + * @param ratedata - The ratedata bucket * * @return whether publish was successful */ - sendMessage( - delta: DeltaResult, - bucket: Record, - doc_id: DocumentId, + private _sendMessage( + doc_id: DocumentId, + delta: Delta, + bucket: Record, + ratedata: Record, ): Promise { - return new Promise( ( resolve, reject ) => - { - const ts = this._ts_ctr(); - const headers = { version: 1, created: ts }; - const avro_object = this.avroFormat( delta, bucket, doc_id, ts ); - const avro_buffer = this.avroEncode( avro_object ); + const ts = this._ts_ctr(); + const headers = { version: 1, created: ts }; + const avro_object = this._avroFormat( + ts, + doc_id, + delta, + bucket, + ratedata, + ); - if ( !this._conn ) + return this.avroEncode( avro_object ) + .then( ( avro_buffer ) => { - reject( context ( - new AmqpError( 'Error sending message: No connection' ), - { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, - }, - ) ); - return; - } - else if ( !this._channel ) - { - reject( context ( - new AmqpError( 'Error sending message: No channel' ), - { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, - }, - ) ); - return; - } - else if ( !avro_buffer ) - { - reject( context ( - new Error( 'Error sending message: No avro buffer' ), - { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, - }, - ) ); - return; - } + const channel = this._conn.getAmqpChannel(); - // we don't use a routing key; fanout exchange - const published_successfully = this._channel.publish( - this._conf.exchange, - '', - avro_buffer, - { headers: headers }, - ); - - if ( published_successfully ) - { - resolve(); - return; - } - - reject( context( - new Error ( 'Error sending message: publishing failed' ), + if ( !channel ) { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, + return Promise.reject( context ( + new AmqpError( 'Error sending message: No channel' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.timestamp, + }, + ) ); } - ) ); - } ); + + // we don't use a routing key; fanout exchange + const published_successfully = channel.publish( + this._conn.getExchangeName(), + '', + avro_buffer, + { headers: headers }, + ); + + if ( !published_successfully ) + { + return Promise.reject( context( + new Error ( 'Delta publish failed' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.timestamp, + } + ) ); + } + + return Promise.resolve(); + } ); } - avroFormat( - delta: DeltaResult, - _bucket: Record, - doc_id: DocumentId, - ts: UnixTimestamp, + /** + * Throw an error with specific information if the schema is invalid + * + * @param schema - Avro schema + * @param data - Data to encode + */ + private _assertValidAvro( + schema: AvroSchema, + data: Record, + ): void + { + schema.isValid( data, { errorHook: hook } ); + + function hook( keys: any, vals: any) { + throw context( new Error( 'Invalid Avro Schema' ), + { + invalid_paths: keys, + invalid_data: vals, + } + ); + } + } + + + /** + * Format the avro data with data type labels + * + * @param ts - a timestamp + * @param doc_id - the document id + * @param delta - the current delta + * @param bucket - the data bucket + * @param ratedata - the ratedata bucket + * + * @return the formatted data + */ + private _avroFormat( + ts: UnixTimestamp, + doc_id: DocumentId, + delta: Delta, + bucket: Record, + ratedata: Record, ): any { - const delta_data = this.setDataTypes( delta.data ); - const event_id = this.DELTA_MAP[ delta.type ]; + const delta_formatted = this.setDataTypes( delta.data ); + const bucket_formatted = this.setDataTypes( bucket ); + const ratedata_formatted = this.setDataTypes( ratedata ); + const event_id = this.DELTA_MAP[ delta.type ]; return { event: { - id: event_id, - ts: ts, + id: event_id, + ts: ts, actor: 'SERVER', - step: null, + step: null, }, document: { - id: doc_id - }, - session: { - entity_name: 'Foobar', // Fix - entity_id: 123123, // Fix + id: doc_id }, data: { Data: { - bucket: _bucket, + bucket: bucket_formatted, + }, + }, + ratedata: { + Data: { + bucket: ratedata_formatted, }, }, delta: { Data: { - bucket: delta_data, + bucket: delta_formatted, }, }, program: { Program: { id: 'quote_server', - version: 'dadaddwafdwa', // Fix + version: '', }, }, } } + /** * Encode the data in an avro buffer * @@ -322,33 +254,28 @@ export class DeltaPublisher implements AmqpPublisher * * @return the avro buffer or null if there is an error */ - avroEncode( data: Record ): Buffer | null + avroEncode( data: Record ): Promise { - let buffer = null; - - try + return new Promise( ( resolve, reject ) => { - if ( !this._type ) + const bufs: Buffer[] = []; + + try { - this._emitter.emit( - 'avro-err', - 'No avro scheama found', - ); + this._assertValidAvro( this._schema, data ) - return null; + const encoder = this._encoder_ctr( this._schema ) + + encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) + encoder.on('error', ( err: Error ) => { reject( err ); } ) + encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } ) + encoder.end( data ); } - - buffer = this._type.toBuffer( data ); - } - catch( e ) - { - this._emitter.emit( - 'avro-err', - 'Error encoding data to avro: ' + e, - ); - } - - return buffer; + catch ( e ) + { + reject( e ); + } + } ); } @@ -365,7 +292,7 @@ export class DeltaPublisher implements AmqpPublisher switch( typeof( data ) ) { - case 'object': // Typescript treats arrays as objects + case 'object': if ( data == null ) { return null; diff --git a/src/system/EventLogger.ts b/src/system/EventLogger.ts deleted file mode 100644 index ef46aca..0000000 --- a/src/system/EventLogger.ts +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Event logger - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of liza. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * PSR-12 style logger based on node events - */ - -import { EventEmitter } from "events"; - -enum LogLevel { - DEBUG, - INFO, - NOTICE, - WARNING, - ERROR, - CRITICAL, - ALERT, - EMERGENCY, -}; - -declare type StructuredLog = { - message: string; - timestamp: UnixTimestamp; - service: string; - env: string; - severity: string; -} - -export class EventLogger -{ - /** - * Initialize logger - * - * @param _env - The environment ( dev, test, demo, live ) - * @param _emitter - An event emitter - * @param _ts_ctr - a timestamp constructor - */ - constructor( - private readonly _console: Console, - private readonly _env: string, - private readonly _emitter: EventEmitter, - private readonly _ts_ctr: () => UnixTimestamp, - ) { - this.init(); - } - - - /** - * Initialize the logger to look for specific events - */ - init(): void - { - this._registerEvent( 'document-processed', LogLevel.NOTICE ); - this._registerEvent( 'delta-publish', LogLevel.NOTICE ); - this._registerEvent( 'amqp-conn-error', LogLevel.WARNING ); - this._registerEvent( 'amqp-reconnect', LogLevel.WARNING ); - this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR ); - this._registerEvent( 'avro-err', LogLevel.ERROR ); - this._registerEvent( 'dao-err', LogLevel.ERROR ); - this._registerEvent( 'publish-err', LogLevel.ERROR ); - - // this._registerEvent( 'log', LogLevel.INFO ); - // this._registerEvent( 'debug', LogLevel.DEBUG ); - // this._registerEvent( 'info', LogLevel.INFO ); - // this._registerEvent( 'notice', LogLevel.NOTICE ); - // this._registerEvent( 'warning', LogLevel.WARNING ); - // this._registerEvent( 'error', LogLevel.ERROR ); - // this._registerEvent( 'critical', LogLevel.CRITICAL ); - // this._registerEvent( 'alert', LogLevel.ALERT ); - // this._registerEvent( 'emergency', LogLevel.EMERGENCY ); - } - - - /** - * Register an event at a specific log level - * - * @param event_id - the event id - * @param level - the log level - */ - private _registerEvent( event_id: string, level: LogLevel ): void - { - const logF = this._getLogLevelFunction( level ) - - this._emitter.on( event_id, logF ); - } - - - /** - * Get a logging function for the specified log level - * - * @param event_id - the event id - * - * @return the function to log with - */ - private _getLogLevelFunction( level: LogLevel ): ( str: string ) => void - { - switch( level ) - { - case LogLevel.DEBUG: - case LogLevel.INFO: - return ( str ) => this._console.info( this._format( str, level ) ); - case LogLevel.NOTICE: - return ( str ) => this._console.log( this._format( str, level ) ); - case LogLevel.WARNING: - return ( str ) => this._console.warn( this._format( str, level ) ); - case LogLevel.ERROR: - case LogLevel.CRITICAL: - case LogLevel.ALERT: - case LogLevel.EMERGENCY: - return ( str ) => this._console.error( this._format( str, level ) ); - default: - return ( str ) => this._console.log( "UNKNOWN LOG LEVEL: " + str ); - } - } - - - /** - * Get structured log object - * - * @param str - the string to log - * @param level - the log level - * - * @returns a structured logging object - */ - private _format( str: string, level: LogLevel ): StructuredLog - { - return { - message: str, - timestamp: this._ts_ctr(), - service: 'quote-server', - env: this._env, - severity: LogLevel[level], - }; - } -} diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts new file mode 100644 index 0000000..31b245e --- /dev/null +++ b/src/system/EventMediator.ts @@ -0,0 +1,88 @@ +/** + * Event Meditator + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Hook events and log them + */ + +import { EventEmitter } from 'events'; +import { PsrLogger } from './PsrLogger'; +import { hasContext } from '../error/ContextError'; + +export class EventMediator +{ + /** + * Initialize mediator + * + * @param _log - A PSR-3 style logger + * @param _emitter - An event emitter + */ + constructor( + private readonly _log: PsrLogger, + private readonly _emitter: EventEmitter, + ) { + this._emitter.on( 'delta-publish', ( msg ) => this._log.notice( + 'Published delta to exchange', + msg + ) ); + + this._emitter.on( 'document-processed', ( msg ) => this._log.notice( + 'Deltas on document processed successfully. Document has been ' + + 'marked as completely processed.', + msg + ) ); + + this._emitter.on( 'amqp-conn-error', ( msg ) => + this._log.warning( 'AMQP Connection Error', msg ) ); + + this._emitter.on( 'amqp-reconnect', () => + this._log.warning( + '...attempting to re-establish AMQP connection' + ) + ); + + this._emitter.on( 'amqp-reconnected', () => + this._log.warning( + 'AMQP re-connected' + ) + ); + + this._emitter.on( 'error', ( arg ) => + this._handleError( arg ) ); + } + + + private _handleError( e: any ): void + { + let msg: string = ''; + let context = {}; + + if ( e instanceof( Error ) ) + { + msg = e.message; + + if ( hasContext( e ) ) + { + context = e.context; + } + } + + this._log.error( msg, context ); + } +} diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index 30c4ea2..bc4a6cc 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -21,27 +21,12 @@ * Collect Metrics for Prometheus */ -import { DeltaDao } from "./db/DeltaDao"; import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; import { EventEmitter } from "events"; -import { PrometheusFactory } from './PrometheusFactory'; +import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory'; const client = require( 'prom-client' ) -export declare type PrometheusConfig = { - /** The hostname to connect to */ - hostname: string; - - /** The port to connect to */ - port: number; - - /** The environment ( dev, test, demo, live ) */ - env: string; - - /** The rate (in milliseconds) at which metrics are pushed */ - push_interval_ms: number; -} - export type MetricTimer = ( _start_time?: [ number, number ] @@ -78,6 +63,8 @@ export class MetricsCollector /** Timing map */ private _timing_map: Record = {}; + private _push_interval: NodeJS.Timer; + /** * Initialize delta logger @@ -133,8 +120,7 @@ export class MetricsCollector ); // Push metrics on a specific interval - setInterval( - () => + this._push_interval = setInterval( () => { this._gateway.pushAdd( { jobName: 'liza_delta_metrics' }, this.pushCallback @@ -147,6 +133,15 @@ export class MetricsCollector } + /** + * Stop the push interval + */ + stop(): void + { + clearInterval( this._push_interval ); + } + + /** * List to events to update metrics */ @@ -154,10 +149,7 @@ export class MetricsCollector { this._emitter.on( 'delta-process-start', - ( uid: string ) => - { - this._timing_map[ uid ] = this._timer(); - } + ( uid: string ) => { this._timing_map[ uid ] = this._timer(); } ); this._emitter.on( @@ -166,7 +158,7 @@ export class MetricsCollector { const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ]; const t = this._timer( start_time_ms ); - const total_time_ms = ( ( t[ 0 ] * 1000 ) + ( t[ 1 ] / 1000 ) ); + const total_time_ms = t[ 0 ] * 1000 + t[ 1 ] / 1000000; this._process_time.observe( total_time_ms ); this._total_processed.inc(); @@ -188,27 +180,23 @@ export class MetricsCollector * @param body - The resposne body */ private pushCallback( - _error?: Error | undefined, + error?: Error | undefined, _response?: any, _body?: any ): void { - console.log( 'Push callback' ); - console.error( _error ); + if ( error ) + { + this._emitter.emit( 'error', error ); + } } /** - * Look for mongodb delta errors and update metrics if found - * - * @return any errors the occurred + * Update metrics with current error count */ - checkForErrors( dao: DeltaDao ): NullableError + updateErrorCount( count: number ): void { - dao.getErrorCount() - .then( count => { this._current_error.set( +count ); } ) - .catch( err => { return err; } ); - - return null; + this._current_error.set( +count ); } } diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts index 088612e..0cf059f 100644 --- a/src/system/PrometheusFactory.ts +++ b/src/system/PrometheusFactory.ts @@ -22,6 +22,42 @@ */ import { Pushgateway, Histogram, Counter, Gauge } from 'prom-client'; + +export declare type PrometheusConfig = { + /** The hostname to connect to */ + hostname: string; + + /** The port to connect to */ + port: number; + + /** The environment ( dev, test, demo, live ) */ + env: string; + + /** The rate (in milliseconds) at which metrics are pushed */ + push_interval_ms: number; +} + + +/** + * Create a prometheus configuration from the environment + * + * @param env - the environment variables + * + * @return the prometheus configuration + */ +export function createPrometheusConfig( + env: NodeJS.ProcessEnv +): PrometheusConfig +{ + return { + 'hostname': env.prom_hostname, + 'port': +( env.prom_port || 0 ), + 'env': process.env.NODE_ENV, + 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ), + }; +} + + export class PrometheusFactory { /** diff --git a/src/system/PsrLogger.ts b/src/system/PsrLogger.ts new file mode 100644 index 0000000..276c78e --- /dev/null +++ b/src/system/PsrLogger.ts @@ -0,0 +1,117 @@ +/** + * PSR logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * PSR-3 style logger + */ + +export enum LogLevel { + DEBUG, + INFO, + NOTICE, + WARNING, + ERROR, + CRITICAL, + ALERT, + EMERGENCY, +}; + + +export interface PsrLogger +{ + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + debug( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + info( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + notice( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + warning( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + error( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + critical( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + alert( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + emergency( msg: string | object, context?: object ): void + + + /** + * Log a message + * + * @param msg - the message to log + * @param context - additional message context + */ + log( level: LogLevel, msg: string | object, context?: object ): void +} diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts new file mode 100644 index 0000000..d69c3d4 --- /dev/null +++ b/src/system/StandardLogger.ts @@ -0,0 +1,198 @@ +/** + * Stdout logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Standard out logger implementing PSR-3 standards + */ +import { PsrLogger, LogLevel } from './PsrLogger'; + +declare type StructuredLog = { + message: string; + timestamp: UnixTimestamp; + service: string; + env: string; + severity: string; + context?: Record; +} + +export class StandardLogger implements PsrLogger +{ + /** + * Initialize logger + * + * @param _console + * @param _ts_ctr - a timestamp constructor + * @param _env - The environment ( dev, test, demo, live ) + */ + constructor( + private readonly _console: Console, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _env: string, + ) {} + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + debug( msg: string | object, context?: object ): void + { + this._console.info( this._format( LogLevel.DEBUG, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + info( msg: string | object, context?: object ): void + { + this._console.info( this._format( LogLevel.INFO, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + notice( msg: string | object, context?: object ): void + { + this._console.log( this._format( LogLevel.NOTICE, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + warning( msg: string | object, context?: object ): void + { + this._console.warn( this._format( LogLevel.WARNING, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + error( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.ERROR, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + critical( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.CRITICAL, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + alert( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.ALERT, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + emergency( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.EMERGENCY, msg, context ) ); + } + + + /** + * Log a message + * + * @param msg - the message to log + * @param context - additional message context + */ + log( level: LogLevel, msg: string | object, context?: object ): void + { + this._console.error( this._format( level, msg, context ) ); + } + + + /** + * Get structured log object + * + * @param msg - the string or object to log + * @param level - the log level + * + * @returns a structured logging object + */ + private _format( + level: LogLevel, + msg: string | object, + context: object = {}, + ): StructuredLog + { + let str: string; + + if ( msg !== null && typeof( msg ) === 'object' ) + { + str = JSON.stringify( msg ); + } + else + { + str = msg; + } + + const structured_log = { + message: str, + timestamp: this._ts_ctr(), + service: 'quote-server', + env: this._env, + severity: LogLevel[level], + }; + + if ( Object.keys( context ).length > 0 ) + { + structured_log[ "context" ] = context; + } + + return structured_log; + } +} diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts new file mode 100644 index 0000000..13b9791 --- /dev/null +++ b/src/system/amqp/AmqpConnection.ts @@ -0,0 +1,154 @@ +/** + * Amqp Connection + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Amqp Connection + */ +import { AmqpConfig } from '../AmqpPublisher'; +import { EventEmitter } from "events"; +import { + connect as AmqpConnect, + Channel, + Connection, +} from 'amqplib'; + + +export class AmqpConnection +{ + /** The amqp connection */ + private _conn?: Connection; + + /** The amqp channel */ + private _channel?: Channel; + + + /** + * Amqp Connection + * + * @param _conf - amqp configuration + * @param _emitter - event emitter instance + */ + constructor( + private readonly _conf: AmqpConfig, + private readonly _emitter: EventEmitter, + ) {} + + + /** + * Initialize connection + */ + connect(): Promise + { + return AmqpConnect( this._conf ) + .then( conn => + { + this._conn = conn; + + /** If there is an error, attempt to reconnect + * Only hook this once because it will be re-hooked on each + * successive successful connection + */ + this._conn.once( 'error', e => + { + this._emitter.emit( 'amqp-conn-error', e ); + this._reconnect(); + } ); + + return this._conn.createChannel(); + } ) + .then( ( ch: Channel ) => + { + this._channel = ch; + + this._channel.assertExchange( + this._conf.exchange, + 'fanout', + { durable: true } + ); + } ); + } + + + /** + * Attempt to re-establish the connection + * + * @param retry_count - the number of retries attempted + */ + private _reconnect( retry_count: number = 0 ): void + { + if ( retry_count >= this._conf.retries ) + { + this._emitter.emit( + 'error', + new Error( 'Could not re-establish AMQP connection.' ) + ); + + return; + } + + this._emitter.emit( 'amqp-reconnect' ); + + this.connect() + .then( _ => { this._emitter.emit( 'amqp-reconnected' ) } ) + .catch( _ => + { + const wait_ms = this._conf.retry_wait; + setTimeout( () => this._reconnect( ++retry_count ), wait_ms ); + } ); + } + + + /** + * Returns the exchange to publish to + * + * @return exchange name + */ + getExchangeName(): string + { + return this._conf.exchange; + } + + + /** + * Returns the amqp channel + * + * @return exchange name + */ + getAmqpChannel(): Channel | undefined + { + if ( !this._channel ) + { + this._reconnect(); + } + + return this._channel; + } + + + /** + * Close the amqp conenction + */ + close(): void + { + if ( this._conn ) + { + this._conn.close.bind(this._conn); + } + } +} \ No newline at end of file diff --git a/src/system/avro/AvroFactory.ts b/src/system/avro/AvroFactory.ts new file mode 100644 index 0000000..0a07a32 --- /dev/null +++ b/src/system/avro/AvroFactory.ts @@ -0,0 +1,90 @@ +/** + * Factory functions for avro + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +import { Duplex } from 'stream'; + +const avro = require( 'avro-js' ); + + +export interface AvroSchema +{ + /** + * Write data to a buffer + * + * @param data - the data to write + * + * @return the buffer if successful + */ + toBuffer( data: Record ): Buffer | null; + + + /** + * Check if data is valid against schema + * + * @param data - the data to validate + * @param opts - options specified as key/values + * + * @return the buffer if it is valid + */ + isValid( + data: Record, + opts?: Record + ): Buffer | null; + + + /** + * Write to a buffer + * + * @param data - the data to write + * @param buffer - the buffer that will be written to + */ + encode( data: Record, buffer: Buffer ): void; + + + /** + * Output to a json string + * + * @param data - the data to format + * + * @return the formatted data + */ + toString( data: Record ): string; + + + /** + * Deserialize from a buffer + * + * @param buffer - the buffer to read from + * + * @return the resulting data + */ + fromBuffer( buffer: Buffer ): any; +} + + +/** The avro encoder constructor type */ +export type AvroEncoderCtr = ( type: AvroSchema ) => Duplex; + + +/** The avro encoder constructor */ +export function createAvroEncoder( schema: AvroSchema ): Duplex +{ + return new avro.streams.BlockEncoder( schema ); +} \ No newline at end of file diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc index 4a9a609..53e4a9d 100644 --- a/src/system/avro/schema.avsc +++ b/src/system/avro/schema.avsc @@ -77,23 +77,6 @@ ] } }, - { - "name": "session", - "type": { - "type": "record", - "name": "Session", - "fields": [ - { - "name": "entity_name", - "type": "string" - }, - { - "name": "entity_id", - "type": "int" - } - ] - } - }, { "name": "data", "type": [ @@ -160,6 +143,13 @@ } ] }, + { + "name": "ratedata", + "type": [ + "null", + "Data" + ] + }, { "name": "delta", "type": [ @@ -191,4 +181,3 @@ } ] } - diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index d7d2544..881bc79 100644 --- a/src/system/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -28,6 +28,7 @@ */ import { DocumentId } from "../../document/Document"; +import { DeltaDocument } from "../../bucket/delta"; /** Manage deltas */ @@ -38,7 +39,7 @@ export interface DeltaDao * * @return documents in need of processing */ - getUnprocessedDocuments(): Promise[]> + getUnprocessedDocuments(): Promise /** @@ -46,8 +47,6 @@ export interface DeltaDao * * @param doc_id - Document whose index will be set * @param type - Delta type - * - * @return any errors that occurred */ advanceDeltaIndex( doc_id: DocumentId, @@ -61,8 +60,6 @@ export interface DeltaDao * * @param doc_id - The document to mark * @param last_update_ts - The last time this document was updated - * - * @return any errors that occurred */ markDocumentAsProcessed( doc_id: DocumentId, diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index 4ab4e91..8d45684 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -26,7 +26,7 @@ import { DeltaDao } from './DeltaDao'; import { MongoCollection } from 'mongodb'; import { context } from '../../error/ContextError'; import { DaoError } from '../../error/DaoError'; -import { DeltaType } from '../../bucket/delta'; +import { DeltaType, DeltaDocument } from '../../bucket/delta'; /** Manage deltas */ export class MongoDeltaDao implements DeltaDao @@ -37,11 +37,21 @@ export class MongoDeltaDao implements DeltaDao /** The data delta type */ static readonly DELTA_DATA: string = 'data'; + /** The document fields to read */ + readonly RESULT_FIELDS: Record = { + id: 1, + lastUpdate: 1, + data: 1, + ratedata: 1, + rdelta: 1, + totalPublishDelta: 1, + }; + /** * Initialize connection * - * @param _db Mongo db + * @param _collection - Mongo db collection */ constructor( private readonly _collection: MongoCollection, @@ -53,7 +63,7 @@ export class MongoDeltaDao implements DeltaDao * * @return documents in need of processing */ - getUnprocessedDocuments(): Promise[]> + getUnprocessedDocuments(): Promise { return new Promise( ( resolve, reject ) => { @@ -62,7 +72,7 @@ export class MongoDeltaDao implements DeltaDao published: false, deltaError: false, }, - {}, + { fields: this.RESULT_FIELDS }, ( e, cursor ) => { if ( e ) @@ -75,7 +85,7 @@ export class MongoDeltaDao implements DeltaDao return } - cursor.toArray( ( e: Error, data: any[] ) => + cursor.toArray( ( e: Error, data: DeltaDocument[] ) => { if ( e ) { @@ -100,13 +110,8 @@ export class MongoDeltaDao implements DeltaDao * * @param doc_id - Document whose index will be set * @param type - Delta type - * - * @return any errors that occurred */ - advanceDeltaIndex( - doc_id: DocumentId, - type: DeltaType, - ): Promise + advanceDeltaIndex( doc_id: DocumentId, type: DeltaType ): Promise { return new Promise( ( resolve, reject ) => { @@ -123,9 +128,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( context( - new DaoError( - 'Error advancing delta index: ' + e - ), + new DaoError( 'Error advancing delta index: ' + e ), { doc_id: doc_id, type: type, @@ -149,8 +152,6 @@ export class MongoDeltaDao implements DeltaDao * * @param doc_id - The document to mark * @param last_update_ts - The last time this document was updated - * - * @return any errors that occurred */ markDocumentAsProcessed( doc_id: DocumentId, diff --git a/src/system/db/MongoFactory.ts b/src/system/db/MongoFactory.ts new file mode 100644 index 0000000..5a3b03e --- /dev/null +++ b/src/system/db/MongoFactory.ts @@ -0,0 +1,176 @@ +/** + * Mongo Factory functions + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * These definitions are for a very old mongodb library, which will be + * once we get around to updating node. Quite a failure on the maintenance + * front. + * + * instantiate objects for MongoDb + */ +import { MongoDb, MongoDbConfig, MongoCollection } from '../../types/mongodb'; +import { DaoError } from '../../error/DaoError'; + + +const { + Db: MongoDb, + Server: MongoServer, + ReplServers: ReplSetServers, +} = require( 'mongodb' ); + + +/** + * Create a mongodb configuration from the environment + * + * @param env - the environment variables + * + * @return the mongo configuration + */ +export function createMongoConfig( env: NodeJS.ProcessEnv ): MongoDbConfig +{ + return { + 'port': +( env.MONGODB_PORT || 0 ), + 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1, + 'replset': env.LIZA_MONGODB_REPLSET, + 'host': env.MONGODB_HOST, + 'host_a': env.LIZA_MONGODB_HOST_A, + 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ), + 'host_b': env.LIZA_MONGODB_HOST_B, + 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ), + 'collection': 'quotes', + }; +} + + +/** + * Create the database connection + * + * @param conf - the configuration from the environment + * + * @return the mongodb connection + */ +export function createMongoDB( conf: MongoDbConfig ): MongoDb +{ + if( conf.ha ) + { + var mongodbPort = conf.port || 27017; + var mongodbReplSet = conf.replset || 'rs0'; + var dbServers = new ReplSetServers( + [ + new MongoServer( conf.host_a, conf.port_a || mongodbPort), + new MongoServer( conf.host_b, conf.port_b || mongodbPort), + ], + {rs_name: mongodbReplSet, auto_reconnect: true} + ); + } + else + { + var dbServers = new MongoServer( + conf.host || '127.0.0.1', + conf.port || 27017, + {auto_reconnect: true} + ); + } + var db = new MongoDb( + 'program', + dbServers, + {native_parser: false, safe: false} + ); + return db; +} + + +/** + * Attempts to connect to the database and retrieve the collection + * + * connectError event will be emitted on failure. + * + * @param db - the mongo database + * @param conf - the mongo configuration + * + * @return the collection + */ +export function getMongoCollection( + db: MongoDb, + conf: MongoDbConfig +): Promise +{ + return new Promise( ( resolve, reject ) => + { + // attempt to connect to the database + db.open( ( e: any, db: any ) => + { + // if there was an error, don't bother with anything else + if ( e ) + { + // in some circumstances, it may just be telling us that + // we're already connected (even though the connection may + // have been broken) + if ( e.errno !== undefined ) + { + reject( new Error( + 'Error opening mongo connection: ' + e + ) ); + return; + } + } else if ( db == null ) + { + reject( new DaoError( 'No database connection' ) ); + return; + } + + // quotes collection + db.collection( + conf.collection, + ( e: any, collection: MongoCollection ) => + { + if ( e ) + { + reject( new DaoError( + 'Error creating collection: ' + e + ) ); + return; + } + + // initialize indexes + collection.createIndex( + [ + ['published', 1], + ['deltaError', 1], + ], + true, + ( e: any, _index: { [P: string]: any } ) => + { + if ( e ) + { + reject( new DaoError( + 'Error creating index: ' + e + ) ); + return; + } + + resolve( collection ); + return; + } + ); + } + ); + } ); + } ); +} \ No newline at end of file diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts index 4642223..bbbfa46 100644 --- a/src/types/mongodb.d.ts +++ b/src/types/mongodb.d.ts @@ -62,6 +62,15 @@ export interface MongoDb * @param callback continuation on completion */ close( callback: MongoCallback ): void; + + + /** + * Hook events + * + * @param event_id - the event to hook + * @param callback - a function to call in response to the event + */ + on( event_id: string, callback: ( err: Error ) => void ): void; } @@ -107,6 +116,9 @@ interface MongoFindOptions /** Whether to project only id's */ id?: number, + + /** Which fields to include in the result set */ + fields?: Record, } @@ -236,6 +248,10 @@ declare interface MongoCollection /** * Creates an index on the collection + * + * @param fieldOrSpec - indexes to create + * @param options - mongo options + * @param callback - continuation on completion */ createIndex( fieldOrSpec: MongoIndexSpecification, @@ -246,6 +262,9 @@ declare interface MongoCollection /** * Creates an index on the collection + * + * @param docs - documents to insert + * @param callback - continuation on completion */ insert( docs: MongoInsertSpecification, diff --git a/test/server/db/MongoServerDaoTest.js b/test/server/db/MongoServerDaoTest.js deleted file mode 100644 index d6c8bf1..0000000 --- a/test/server/db/MongoServerDaoTest.js +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Tests MongoServerDao - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of the Liza Data Collection Framework. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -'use strict'; -Object.defineProperty(exports, "__esModule", { value: true }); -var MongoServerDao_1 = require("../../../src/server/db/MongoServerDao"); -var chai_1 = require("chai"); -chai_1.use(require('chai-as-promised')); -describe('MongoServerDao', function () { - describe('#saveQuote', function () { - describe("with no save data or push data", function () { - it("saves entire metabucket record individually", function (done) { - var metadata = { - foo: ['bar', 'baz'], - bar: [{ quux: 'quuux' }], - }; - var quote = createStubQuote(metadata); - var sut = new MongoServerDao_1.MongoServerDao(createMockDb( - // update - function (_selector, data) { - chai_1.expect(data.$set['meta.foo']) - .to.deep.equal(metadata.foo); - chai_1.expect(data.$set['meta.bar']) - .to.deep.equal(metadata.bar); - chai_1.expect(data.$push).to.equal(undefined); - done(); - })); - sut.init(function () { - return sut.saveQuote(quote, function () { }, function () { }); - }); - }); - }); - describe("with push data", function () { - it("adds push data to the collection", function (done) { - var push_data = { - foo: ['bar', 'baz'], - bar: [{ quux: 'quuux' }], - }; - var quote = createStubQuote({}); - var sut = new MongoServerDao_1.MongoServerDao(createMockDb( - // update - function (_selector, data) { - chai_1.expect(data.$push['foo']) - .to.deep.equal(push_data.foo); - chai_1.expect(data.$push['bar']) - .to.deep.equal(push_data.bar); - done(); - })); - sut.init(function () { - return sut.saveQuote(quote, function () { }, function () { }, undefined, push_data); - }); - }); - it("skips push data when it is an empty object", function (done) { - var push_data = {}; - var quote = createStubQuote({}); - var sut = new MongoServerDao_1.MongoServerDao(createMockDb( - // update - function (_selector, data) { - chai_1.expect(data.$push).to.equal(undefined); - done(); - })); - sut.init(function () { - return sut.saveQuote(quote, function () { }, function () { }, undefined, push_data); - }); - }); - }); - }); -}); -function createMockDb(on_update) { - var collection_quotes = { - update: on_update, - createIndex: function (_, __, c) { return c(); }, - }; - var collection_seq = { - find: function (_, __, c) { - c(null, { - toArray: function (c) { return c(null, { length: 5 }); }, - }); - }, - }; - var db = { - collection: function (id, c) { - var coll = (id === 'quotes') - ? collection_quotes - : collection_seq; - c(null, coll); - }, - }; - var driver = { - open: function (c) { return c(null, db); }, - on: function () { }, - }; - return driver; -} -function createStubQuote(metadata) { - var program = { - getId: function () { return '1'; }, - ineligibleLockCount: 0, - apis: {}, - internal: {}, - meta: { - arefs: {}, - fields: {}, - groups: {}, - qdata: {}, - qtypes: {}, - }, - mapis: {}, - initQuote: function () { }, - }; - var quote = { - getBucket: function () { return ({ - getData: function () { }, - }); }, - getMetabucket: function () { return ({ - getData: function () { return metadata; }, - }); }, - getId: function () { return 123; }, - getProgramVersion: function () { return 'Foo'; }, - getLastPremiumDate: function () { return 0; }, - getRatedDate: function () { return 0; }, - getExplicitLockReason: function () { return ""; }, - getExplicitLockStep: function () { return 1; }, - isImported: function () { return false; }, - isBound: function () { return false; }, - getTopVisitedStepId: function () { return 1; }, - getTopSavedStepId: function () { return 1; }, - setRatedDate: function () { return quote; }, - setRateBucket: function () { return quote; }, - setRatingData: function () { return quote; }, - getRatingData: function () { return ({ _unavailable_all: '0' }); }, - getProgram: function () { return program; }, - setExplicitLock: function () { return quote; }, - getProgramId: function () { return 'Foo'; }, - getCurrentStepId: function () { return 0; }, - setLastPremiumDate: function () { return quote; }, - }; - return quote; -} -//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiTW9uZ29TZXJ2ZXJEYW9UZXN0LmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiTW9uZ29TZXJ2ZXJEYW9UZXN0LnRzIl0sIm5hbWVzIjpbXSwibWFwcGluZ3MiOiJBQUFBOzs7Ozs7Ozs7Ozs7Ozs7Ozs7O0dBbUJHO0FBRUgsWUFBWSxDQUFDOztBQUViLHdFQUE4RTtBQUU5RSw2QkFBK0M7QUFRL0MsVUFBUSxDQUFFLE9BQU8sQ0FBRSxrQkFBa0IsQ0FBRSxDQUFFLENBQUM7QUFHMUMsUUFBUSxDQUFFLGdCQUFnQixFQUFFO0lBRXhCLFFBQVEsQ0FBRSxZQUFZLEVBQUU7UUFFcEIsUUFBUSxDQUFFLGdDQUFnQyxFQUFFO1lBRXhDLEVBQUUsQ0FBRSw2Q0FBNkMsRUFBRSxVQUFBLElBQUk7Z0JBRW5ELElBQU0sUUFBUSxHQUFHO29CQUNiLEdBQUcsRUFBRSxDQUFFLEtBQUssRUFBRSxLQUFLLENBQUU7b0JBQ3JCLEdBQUcsRUFBRSxDQUFFLEVBQUUsSUFBSSxFQUFFLE9BQU8sRUFBRSxDQUFFO2lCQUM3QixDQUFDO2dCQUVGLElBQU0sS0FBSyxHQUFHLGVBQWUsQ0FBRSxRQUFRLENBQUUsQ0FBQztnQkFFMUMsSUFBTSxHQUFHLEdBQUcsSUFBSSwrQkFBRyxDQUFFLFlBQVk7Z0JBQzdCLFNBQVM7Z0JBQ1QsVUFBRSxTQUF3QixFQUFFLElBQWlCO29CQUV6QyxhQUFNLENBQUUsSUFBSSxDQUFDLElBQUksQ0FBRSxVQUFVLENBQUUsQ0FBRTt5QkFDNUIsRUFBRSxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBRSxDQUFDO29CQUVuQyxhQUFNLENBQUUsSUFBSSxDQUFDLElBQUksQ0FBRSxVQUFVLENBQUUsQ0FBRTt5QkFDNUIsRUFBRSxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBRSxDQUFDO29CQUduQyxhQUFNLENBQUUsSUFBSSxDQUFDLEtBQUssQ0FBRSxDQUFDLEVBQUUsQ0FBQyxLQUFLLENBQUUsU0FBUyxDQUFFLENBQUM7b0JBRTNDLElBQUksRUFBRSxDQUFDO2dCQUNYLENBQUMsQ0FDSixDQUFFLENBQUM7Z0JBRUosR0FBRyxDQUFDLElBQUksQ0FBRTtvQkFDTixPQUFBLEdBQUcsQ0FBQyxTQUFTLENBQUUsS0FBSyxFQUFFLGNBQU8sQ0FBQyxFQUFFLGNBQU8sQ0FBQyxDQUFFO2dCQUExQyxDQUEwQyxDQUM3QyxDQUFDO1lBQ04sQ0FBQyxDQUFFLENBQUM7UUFDUixDQUFDLENBQUUsQ0FBQztRQUVKLFFBQVEsQ0FBRSxnQkFBZ0IsRUFBRTtZQUV4QixFQUFFLENBQUUsa0NBQWtDLEVBQUUsVUFBQSxJQUFJO2dCQUV4QyxJQUFNLFNBQVMsR0FBRztvQkFDZCxHQUFHLEVBQUUsQ0FBRSxLQUFLLEVBQUUsS0FBSyxDQUFFO29CQUNyQixHQUFHLEVBQUUsQ0FBRSxFQUFFLElBQUksRUFBRSxPQUFPLEVBQUUsQ0FBRTtpQkFDN0IsQ0FBQztnQkFFRixJQUFNLEtBQUssR0FBRyxlQUFlLENBQUUsRUFBRSxDQUFFLENBQUM7Z0JBRXBDLElBQU0sR0FBRyxHQUFHLElBQUksK0JBQUcsQ0FBRSxZQUFZO2dCQUM3QixTQUFTO2dCQUNULFVBQUMsU0FBd0IsRUFBRSxJQUFpQjtvQkFFeEMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsS0FBSyxDQUFFLENBQUU7eUJBQ3hCLEVBQUUsQ0FBQyxJQUFJLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBQyxHQUFHLENBQUUsQ0FBQztvQkFFcEMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsS0FBSyxDQUFFLENBQUU7eUJBQ3hCLEVBQUUsQ0FBQyxJQUFJLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBQyxHQUFHLENBQUUsQ0FBQztvQkFFcEMsSUFBSSxFQUFFLENBQUM7Z0JBQ1gsQ0FBQyxDQUNKLENBQUUsQ0FBQztnQkFFSixHQUFHLENBQUMsSUFBSSxDQUFFO29CQUNOLE9BQUEsR0FBRyxDQUFDLFNBQVMsQ0FDVCxLQUFLLEVBQ0wsY0FBTyxDQUFDLEVBQ1IsY0FBTyxDQUFDLEVBQ1IsU0FBUyxFQUNULFNBQVMsQ0FDWjtnQkFORCxDQU1DLENBQ0osQ0FBQztZQUNOLENBQUMsQ0FBRSxDQUFDO1lBRUosRUFBRSxDQUFFLDRDQUE0QyxFQUFFLFVBQUEsSUFBSTtnQkFFbEQsSUFBTSxTQUFTLEdBQUcsRUFBRSxDQUFDO2dCQUVyQixJQUFNLEtBQUssR0FBRyxlQUFlLENBQUUsRUFBRSxDQUFFLENBQUM7Z0JBRXBDLElBQU0sR0FBRyxHQUFHLElBQUksK0JBQUcsQ0FBRSxZQUFZO2dCQUM3QixTQUFTO2dCQUNULFVBQUUsU0FBd0IsRUFBRSxJQUFpQjtvQkFFekMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsQ0FBQyxFQUFFLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBRSxDQUFDO29CQUUzQyxJQUFJLEVBQUUsQ0FBQztnQkFDWCxDQUFDLENBQ0osQ0FBRSxDQUFDO2dCQUVKLEdBQUcsQ0FBQyxJQUFJLENBQUU7b0JBQ04sT0FBQSxHQUFHLENBQUMsU0FBUyxDQUNULEtBQUssRUFDTCxjQUFPLENBQUMsRUFDUixjQUFPLENBQUMsRUFDUixTQUFTLEVBQ1QsU0FBUyxDQUNaO2dCQU5ELENBTUMsQ0FDSixDQUFDO1lBQ04sQ0FBQyxDQUFFLENBQUM7UUFDUixDQUFDLENBQUUsQ0FBQztJQUNSLENBQUMsQ0FBRSxDQUFDO0FBQ1IsQ0FBQyxDQUFFLENBQUM7QUFHSixTQUFTLFlBQVksQ0FBRSxTQUFjO0lBRWpDLElBQU0saUJBQWlCLEdBQUc7UUFDdEIsTUFBTSxFQUFFLFNBQVM7UUFDakIsV0FBVyxFQUFFLFVBQUUsQ0FBTSxFQUFFLEVBQU8sRUFBRSxDQUFNLElBQU0sT0FBQSxDQUFDLEVBQUUsRUFBSCxDQUFHO0tBQ2xELENBQUM7SUFFRixJQUFNLGNBQWMsR0FBRztRQUNuQixJQUFJLEVBQUosVUFBTSxDQUFNLEVBQUUsRUFBTyxFQUFFLENBQU07WUFFekIsQ0FBQyxDQUFFLElBQUksRUFBRTtnQkFDTCxPQUFPLEVBQUUsVUFBRSxDQUFNLElBQU0sT0FBQSxDQUFDLENBQUUsSUFBSSxFQUFFLEVBQUUsTUFBTSxFQUFFLENBQUMsRUFBRSxDQUFFLEVBQXhCLENBQXdCO2FBQ2xELENBQUUsQ0FBQztRQUNSLENBQUM7S0FDSixDQUFDO0lBRUYsSUFBTSxFQUFFLEdBQUc7UUFDUCxVQUFVLEVBQVYsVUFBWSxFQUFPLEVBQUUsQ0FBTTtZQUV2QixJQUFNLElBQUksR0FBRyxDQUFFLEVBQUUsS0FBSyxRQUFRLENBQUU7Z0JBQzVCLENBQUMsQ0FBQyxpQkFBaUI7Z0JBQ25CLENBQUMsQ0FBQyxjQUFjLENBQUM7WUFFckIsQ0FBQyxDQUFFLElBQUksRUFBRSxJQUFJLENBQUUsQ0FBQztRQUNwQixDQUFDO0tBQ0osQ0FBQztJQUVGLElBQU0sTUFBTSxHQUFHO1FBQ1gsSUFBSSxFQUFFLFVBQUUsQ0FBTSxJQUFNLE9BQUEsQ0FBQyxDQUFFLElBQUksRUFBRSxFQUFFLENBQUUsRUFBYixDQUFhO1FBQ2pDLEVBQUUsRUFBSSxjQUFPLENBQUM7S0FDakIsQ0FBQztJQUVGLE9BQU8sTUFBTSxDQUFDO0FBQ2xCLENBQUM7QUFHRCxTQUFTLGVBQWUsQ0FBRSxRQUE2QjtJQUVuRCxJQUFNLE9BQU8sR0FBWTtRQUNyQixLQUFLLEVBQWdCLGNBQU0sT0FBQSxHQUFHLEVBQUgsQ0FBRztRQUM5QixtQkFBbUIsRUFBRSxDQUFDO1FBQ3RCLElBQUksRUFBaUIsRUFBRTtRQUN2QixRQUFRLEVBQWEsRUFBRTtRQUN2QixJQUFJLEVBQWlCO1lBQ2pCLEtBQUssRUFBRyxFQUFFO1lBQ1YsTUFBTSxFQUFFLEVBQUU7WUFDVixNQUFNLEVBQUUsRUFBRTtZQUNWLEtBQUssRUFBRyxFQUFFO1lBQ1YsTUFBTSxFQUFFLEVBQUU7U0FDYjtRQUNELEtBQUssRUFBZ0IsRUFBRTtRQUN2QixTQUFTLEVBQVksY0FBTyxDQUFDO0tBQ2hDLENBQUM7SUFFRixJQUFNLEtBQUssR0FBb0I7UUFDM0IsU0FBUyxFQUFFLGNBQU0sT0FBaUIsQ0FBRTtZQUNoQyxPQUFPLEVBQUUsY0FBTyxDQUFDO1NBQ3BCLENBQUUsRUFGYyxDQUVkO1FBRUgsYUFBYSxFQUFFLGNBQU0sT0FBaUIsQ0FBRTtZQUNwQyxPQUFPLEVBQUUsY0FBTSxPQUFBLFFBQVEsRUFBUixDQUFRO1NBQzFCLENBQUUsRUFGa0IsQ0FFbEI7UUFFSCxLQUFLLEVBQWtCLGNBQU0sT0FBUyxHQUFHLEVBQVosQ0FBWTtRQUN6QyxpQkFBaUIsRUFBTSxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7UUFDbEMsa0JBQWtCLEVBQUssY0FBTSxPQUFlLENBQUMsRUFBaEIsQ0FBZ0I7UUFDN0MsWUFBWSxFQUFXLGNBQU0sT0FBZSxDQUFDLEVBQWhCLENBQWdCO1FBQzdDLHFCQUFxQixFQUFFLGNBQU0sT0FBQSxFQUFFLEVBQUYsQ0FBRTtRQUMvQixtQkFBbUIsRUFBSSxjQUFNLE9BQWlCLENBQUMsRUFBbEIsQ0FBa0I7UUFDL0MsVUFBVSxFQUFhLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxPQUFPLEVBQWdCLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxtQkFBbUIsRUFBSSxjQUFNLE9BQWlCLENBQUMsRUFBbEIsQ0FBa0I7UUFDL0MsaUJBQWlCLEVBQU0sY0FBTSxPQUFpQixDQUFDLEVBQWxCLENBQWtCO1FBQy9DLFlBQVksRUFBVyxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7UUFDbEMsYUFBYSxFQUFVLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxhQUFhLEVBQVUsY0FBTSxPQUFBLEtBQUssRUFBTCxDQUFLO1FBQ2xDLGFBQWEsRUFBVSxjQUFNLE9BQUEsQ0FBWSxFQUFFLGdCQUFnQixFQUFFLEdBQUcsRUFBRSxDQUFBLEVBQXJDLENBQXFDO1FBQ2xFLFVBQVUsRUFBYSxjQUFNLE9BQUEsT0FBTyxFQUFQLENBQU87UUFDcEMsZUFBZSxFQUFRLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxZQUFZLEVBQVcsY0FBTSxPQUFBLEtBQUssRUFBTCxDQUFLO1FBQ2xDLGdCQUFnQixFQUFPLGNBQU0sT0FBQSxDQUFDLEVBQUQsQ0FBQztRQUM5QixrQkFBa0IsRUFBSyxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7S0FDckMsQ0FBQztJQUVGLE9BQU8sS0FBSyxDQUFDO0FBQ2pCLENBQUMifQ== \ No newline at end of file diff --git a/test/server/db/MongoServerDaoTest.ts b/test/server/db/MongoServerDaoTest.ts index b0a83bb..58e6ab9 100644 --- a/test/server/db/MongoServerDaoTest.ts +++ b/test/server/db/MongoServerDaoTest.ts @@ -22,7 +22,7 @@ 'use strict'; import { MongoServerDao as Sut } from "../../../src/server/db/MongoServerDao"; -import { MongoSelector, MongoUpdate } from "mongodb"; +import { MongoSelector, MongoUpdate, MongoDb } from "mongodb"; import { expect, use as chai_use } from 'chai'; import { ServerSideQuote } from "../../../src/server/quote/ServerSideQuote"; import { PositiveInteger } from "../../../src/numeric"; @@ -139,7 +139,7 @@ describe( 'MongoServerDao', () => } ); -function createMockDb( on_update: any ) +function createMockDb( on_update: any ): MongoDb { const collection_quotes = { update: on_update, @@ -166,8 +166,9 @@ function createMockDb( on_update: any ) }, }; - const driver = { + const driver = { open: ( c: any ) => c( null, db ), + close: () => {}, on: () => {}, }; diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index d0cd8f8..3998d68 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -22,7 +22,8 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; -import { DeltaType } from "../../src/bucket/delta"; +import { DeltaType, DeltaDocument } from "../../src/bucket/delta"; +import { DocumentId } from '../../src/document/Document'; import { EventEmitter } from 'events'; import { expect, use as chai_use } from 'chai'; @@ -308,11 +309,12 @@ describe( 'system.DeltaProcessor', () => }[]>[ { label: 'No deltas are processed', - docs: [ + given: [ { id: 123, lastUpdate: 123123123, - bucket: {}, + data: {}, + ratedata: {}, rdelta: {}, }, ], @@ -324,7 +326,8 @@ describe( 'system.DeltaProcessor', () => { id: 123, lastUpdate: 123123123, - bucket: { foo: [ 'start_bar' ] }, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, rdelta: { data: [ { @@ -341,14 +344,16 @@ describe( 'system.DeltaProcessor', () => ], expected: [ { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'second_bar' ] }, + bucket: { foo: [ 'second_bar' ] }, + ratedata: {}, }, ], }, @@ -358,17 +363,18 @@ describe( 'system.DeltaProcessor', () => { id: 123, lastUpdate: 123123123, - bucket: { foo: 'start_bar' }, + data: { foo: [ 'start_bar_123' ] }, + ratedata: {}, rdelta: { data: [ { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ 'second_bar_123' ] }, timestamp: 234, }, ], ratedata: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ 'first_bar_123' ] }, timestamp: 123, }, ], @@ -377,19 +383,20 @@ describe( 'system.DeltaProcessor', () => { id: 234, lastUpdate: 123123123, - bucket: { foo: 'start_bar' }, + data: { foo: [ 'start_bar_234' ] }, + ratedata: {}, rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ 'first_bar_234' ] }, timestamp: 123, }, { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ 'second_bar_234' ] }, timestamp: 234, }, { - data: { foo: [ 'third_bar' ] }, + data: { foo: [ 'third_bar_234' ] }, timestamp: 345, }, ], @@ -398,15 +405,16 @@ describe( 'system.DeltaProcessor', () => { id: 345, lastUpdate: 123123123, - bucket: { foo: 'start_bar' }, + data: { foo: [ 'start_bar_345' ] }, + ratedata: {}, rdelta: { ratedata: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ 'first_bar_345' ] }, timestamp: 123, }, { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ 'second_bar_345' ] }, timestamp: 234, }, ], @@ -415,60 +423,73 @@ describe( 'system.DeltaProcessor', () => ], expected: [ { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'first_bar_123' ] }, + bucket: { foo: [ 'start_bar_123' ] }, + ratedata: { foo: [ 'first_bar_123' ] }, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'second_bar_123' ] }, + bucket: { foo: [ 'second_bar_123' ] }, + ratedata: { foo: [ 'first_bar_123' ] }, }, { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 234, + doc_id: 234, + delta: { foo: [ 'first_bar_234' ] }, + bucket: { foo: [ 'first_bar_234' ] }, + ratedata: {}, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 234, + doc_id: 234, + delta: { foo: [ 'second_bar_234' ] }, + bucket: { foo: [ 'second_bar_234' ] }, + ratedata: {}, }, { - delta: { foo: [ 'third_bar' ] }, - bucket: { foo: [ 'third_bar' ] }, - doc_id: 234, + doc_id: 234, + delta: { foo: [ 'third_bar_234' ] }, + bucket: { foo: [ 'third_bar_234' ] }, + ratedata: {}, }, { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 345, + doc_id: 345, + delta: { foo: [ 'first_bar_345' ] }, + bucket: { foo: [ 'start_bar_345' ] }, + ratedata: { foo: [ 'first_bar_345' ] }, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 345, + doc_id: 345, + delta: { foo: [ 'second_bar_345' ] }, + bucket: { foo: [ 'start_bar_345' ] }, + ratedata: { foo: [ 'second_bar_345' ] }, }, ], }, - ] ).forEach( ( { given, expected, label } ) => it( label, () => + ] ).forEach( ( { label, given, expected } ) => it( label, () => { let published: any = []; const dao = createMockDeltaDao(); const publisher = createMockDeltaPublisher(); const emitter = new EventEmitter(); - dao.getUnprocessedDocuments = (): Promise[]> => + dao.getUnprocessedDocuments = (): Promise => { return Promise.resolve( given ); } - publisher.publish = ( delta, bucket, doc_id ): Promise => + publisher.publish = ( + doc_id, + delta, + bucket, + ratedata, + ): Promise => { published.push( { - delta: delta.data, - bucket: bucket, - doc_id: doc_id, + doc_id: doc_id, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, } ); return Promise.resolve(); @@ -479,6 +500,203 @@ describe( 'system.DeltaProcessor', () => .then( _ => expect( published ).to.deep.equal( expected ) ); } ) ); } ); + + + describe( 'Error handling', () => + { + it( 'Marks document in error state and continues', () => + { + let published: any = []; + let error_flag_set = false; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + const doc = [ { + id: 123, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + }, + { + id: 234, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + } ]; + + const expected_published = [ + { + doc_id: 123, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, + }, + { + doc_id: 234, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, + } + ]; + + const expected_error = 'Uh oh'; + + dao.getUnprocessedDocuments = (): Promise => + Promise.resolve( doc ); + + dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise => + Promise.reject( new Error( expected_error ) ); + + dao.setErrorFlag = (): Promise => + { + error_flag_set = true; + return Promise.resolve(); + } + + publisher.publish = ( + doc_id, + delta, + bucket, + ratedata, + ): Promise => + { + published.push( { + doc_id: doc_id, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + // Prevent node from converting an error event into an error + emitter.on( 'error', () => {} ); + + return expect( new Sut( dao, publisher, emitter ).process() ) + .to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( error_flag_set ).to.be.true; + expect( published ).to.deep.equal( expected_published ); + } ); + } ); + } ); + + + describe( 'Error handling', () => + { + it( 'Failure to set document error state further processing', () => + { + let published: any = []; + let caught_error = ''; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + const doc = [ { + id: 123, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + }, + { + id: 234, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + } ]; + + // Only one is published + const expected_published = [ { + doc_id: 123, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, + } ]; + + const expected_error = 'Uh oh'; + + dao.getUnprocessedDocuments = (): Promise => + Promise.resolve( doc ); + + dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise => + Promise.reject( new Error( 'Couldn\'t mark document' ) ); + + dao.setErrorFlag = (): Promise => + Promise.reject( new Error( expected_error ) ); + + publisher.publish = ( + doc_id, + delta, + bucket, + ratedata, + ): Promise => + { + published.push( { + doc_id: doc_id, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + // Prevent node from converting an error event into an error + emitter.on( 'error', () => {} ); + + return expect( + new Sut( dao, publisher, emitter ).process() + .catch( e => { caught_error = e.message } ) + ) + .to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( caught_error ).to.equal( expected_error ); + expect( published ).to.deep.equal( expected_published ); + } ); + } ); + } ); } ); diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index 40663d0..fcb788c 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -19,13 +19,25 @@ * along with this program. If not, see . */ +import { AmqpConnection } from '../../src/system/amqp/AmqpConnection'; +import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; -import { AmqpConfig } from '../../src/system/AmqpPublisher'; +import { DocumentId } from '../../src/document/Document'; +import { Duplex } from 'stream'; import { EventEmitter } from "events"; +import { hasContext } from '../../src/error/ContextError'; +import { AmqpError } from '../../src/error/AmqpError'; +import { Channel } from 'amqplib'; +import { + createAvroEncoder, + AvroEncoderCtr, + AvroSchema, +} from '../../src/system/avro/AvroFactory'; import { expect, use as chai_use } from 'chai'; chai_use( require( 'chai-as-promised' ) ); +const sinon = require( 'sinon' ); describe( 'server.DeltaPublisher', () => { @@ -33,24 +45,96 @@ describe( 'server.DeltaPublisher', () => { it( 'sends a message', () => { - const conf = createMockConf(); - const emitter = new EventEmitter(); + let publish_called = false; + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + conn.getAmqpChannel = () => + { + return { + publish: ( _: any, __: any, buf: any, ___: any ) => + { + expect( buf instanceof Buffer ).to.be.true; - console.log( new Sut( conf, emitter, ts_ctr ) ); - expect( true ).to.be.true + publish_called = true; + + return true; + } + }; + }; + + const sut = new Sut( emitter, ts_ctr, createAvroEncoder, conn ); + + return expect( + sut.publish( 123, delta, bucket, ratedata ) + ).to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( publish_called ).to.be.true; + } ); } ); - } ); - describe( '#sendMessage', () => - { - it( 'sends a message', () => + ( <[string, () => Channel | undefined, Error, string ][]>[ + [ + 'Throws an error when publishing was unsuccessful', + () => + { + return { + publish: ( _: any, __: any, _buf: any, ___: any ) => + { + return false; + } + }; + }, + Error, + 'Delta publish failed' + ], + [ + 'Throws an error when no amqp channel is found', + () => + { + return undefined; + }, + AmqpError, + 'Error sending message: No channel' + ] + ] ).forEach( ( [ label, getChannelF, error_type, err_msg ] ) => + it( label, () => { - const conf = createMockConf(); - const emitter = new EventEmitter(); + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const doc_id = 123; + const expected = { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.timestamp + } - console.log( new Sut( conf, emitter, ts_ctr ) ); - expect( true ).to.be.true - } ); + conn.getAmqpChannel = getChannelF; + + const result = new Sut( emitter, ts_ctr, createAvroEncoder, conn ) + .publish( doc_id, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( + error_type, err_msg + ), + result.catch( e => + { + if ( !hasContext( e ) ) + { + return expect.fail(); + } + + return expect( e.context ).to.deep.equal( expected ); + } ) + ] ); + } ) ); } ); describe( '#avroEncode parses', () => @@ -137,32 +221,26 @@ describe( 'server.DeltaPublisher', () => { it( label, () => { - let errorCalled = false; + const emitter = createMockEventEmitter(); + const conn = createMockAmqpConnection(); + const data = createMockData( delta_data ); + const sut = new Sut( + emitter, + ts_ctr, + createAvroEncoder, + conn, + ); - const emitter = { - emit( _event_id, _err ) + sut.avroEncode( data ) + .then( b => { - errorCalled = true; - - console.log( 'server.DeltaPublisher.Error' + _err ); - } - } - - const conf = createMockConf(); - const data = createMockData( delta_data ); - const sut = new Sut( conf, emitter, ts_ctr ); - const buffer = sut.avroEncode( data ); - - if ( valid ) - { - expect( typeof(buffer) ).to.equal( 'object' ); - } - else - { - expect( buffer ).to.equal( null ); - } - - expect( valid ).to.equal( !errorCalled ); + expect( typeof(b) ).to.equal( 'object' ); + expect( valid ).to.be.true; + } ) + .catch( _ => + { + expect( valid ).to.be.false; + } ); } ); } ); } ); @@ -301,9 +379,16 @@ describe( 'server.DeltaPublisher', () => { it( label, () => { - const emitter = {} - const conf = createMockConf(); - const sut = new Sut( conf, emitter, ts_ctr ); + const encoded = 'FooBar'; + const emitter = createMockEventEmitter(); + const conn = createMockAmqpConnection(); + const avroEncoderCtr = createMockEncoder( encoded ); + const sut = new Sut( + emitter, + ts_ctr, + avroEncoderCtr, + conn, + ); const actual = sut.setDataTypes( delta_data ); expect( actual ).to.deep.equal( expected ); @@ -312,14 +397,39 @@ describe( 'server.DeltaPublisher', () => } ); } ); + function ts_ctr(): UnixTimestamp { return Math.floor( new Date().getTime() / 1000 ); } -function createMockConf(): AmqpConfig + +function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr { - return {}; + return ( _schema: AvroSchema ) => + { + const mock = sinon.mock( Duplex ); + + mock.on = ( _: string, __: any ) => {}; + mock.end = ( _: any ) => { return mock_encoded_data; }; + + return mock; + }; +} + + +function createMockEventEmitter(): EventEmitter +{ + return {}; +} + + +function createMockAmqpConnection(): AmqpConnection +{ + return { + connect: () => {}, + getExchangeName: () => { 'Foo' }, + }; } @@ -339,11 +449,8 @@ function createMockData( delta_data: any ): any modified: 1573856916, top_visited_step: '2', }, - session: { - entity_name: 'Foobar', - entity_id: 123123 , - }, - data: null, + data: null, + ratedata: null, delta: { Data: { bucket: delta_data, @@ -356,4 +463,22 @@ function createMockData( delta_data: any ): any }, }, }; +} + + +function createMockBucketData(): Record +{ + return { + foo: [ 'bar', 'baz' ] + } +} + + +function createMockDelta(): Delta +{ + return >{ + type: 'data', + timestamp: 123123123, + data: >{}, + } } \ No newline at end of file diff --git a/test/system/EventLoggerTest.ts b/test/system/EventLoggerTest.ts deleted file mode 100644 index b3d5f0f..0000000 --- a/test/system/EventLoggerTest.ts +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Event logger test - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of the Liza Data Collection Framework. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -import { EventLogger as Sut } from '../../src/system/EventLogger'; -import { EventEmitter } from "events"; -import { expect } from 'chai'; - -const sinon = require( 'sinon' ); - -declare interface MockConsole extends Console { - getLevel(): string, -} - -describe( 'system.EventLogger captures and logs events', () => -{ - [ - { - event_id: 'document-processed', - console_level: 'log', - }, - { - event_id: 'delta-publish', - console_level: 'log', - }, - { - event_id: 'amqp-conn-error', - console_level: 'warn', - }, - { - event_id: 'amqp-reconnect', - console_level: 'warn', - }, - { - event_id: 'amqp-reconnect-fail', - console_level: 'error', - }, - { - event_id: 'avro-err', - console_level: 'error', - }, - { - event_id: 'dao-err', - console_level: 'error', - }, - { - event_id: 'publish-err', - console_level: 'error', - }, - ].forEach( ( { event_id, console_level } ) => - { - it( event_id + ' triggers console output level: ' + console_level, () => - { - const emitter = new EventEmitter(); - const con = createMockConsole(); - const env = 'test'; - - new Sut( con, env, emitter, ts_ctr ); - - emitter.emit( event_id ); - - expect( con.getLevel() ).to.equal( console_level ); - } ); - } ); -} ); - - -function ts_ctr(): UnixTimestamp -{ - return Math.floor( new Date().getTime() / 1000 ); -} - - -function createMockConsole(): MockConsole -{ - const mock = sinon.mock( console ); - - mock.level = ''; - mock.info = ( _str: string ) => { mock.level = 'info'; }; - mock.log = ( _str: string ) => { mock.level = 'log'; }; - mock.warn = ( _str: string ) => { mock.level = 'warn'; }; - mock.error = ( _str: string ) => { mock.level = 'error'; }; - mock.getLevel = () => mock.level; - - return mock; -} \ No newline at end of file diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts new file mode 100644 index 0000000..581437c --- /dev/null +++ b/test/system/EventMediatorTest.ts @@ -0,0 +1,139 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { EventMediator as Sut } from '../../src/system/EventMediator'; +import { context } from '../../src/error/ContextError'; +import { EventEmitter } from "events"; +import { expect } from 'chai'; +import { PsrLogger } from '../../src/system/PsrLogger'; + + +describe( 'system.EventLogger captures and logs events', () => +{ + it( 'document-processed triggers log#notice', () => + { + let method_called = false; + + const event_id = 'document-processed'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.notice = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'delta-publish triggers log#notice', () => + { + let method_called = false; + + const event_id = 'delta-publish'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.notice = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'amqp-conn-error triggers log#warning', () => + { + let method_called = false; + + const event_id = 'amqp-conn-error'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.warning = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'amqp-reconnect triggers log#warning', () => + { + let method_called = false; + + const event_id = 'amqp-reconnect'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.warning = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'context is retrieved from error', () => + { + let method_called = false; + + const event_id = 'error'; + const err_msg = 'Foo'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + const err_context = { bar: 'baz' }; + + log.error = ( str: string, context: any ) => + { + method_called = true; + + expect( str ).to.equal( err_msg ); + expect( context ).to.equal( err_context ); + }; + + new Sut( log, emitter ); + + emitter.emit( event_id, context( new Error( err_msg ), err_context ) ); + + expect( method_called ).to.be.true; + } ); +} ); + + +function createMockLogger(): PsrLogger +{ + return { + debug( _msg: string | object, _context: object ){}, + info( _msg: string | object, _context: object ){}, + notice( _msg: string | object, _context: object ){ console.log( 'asdasd msg: ', _msg ); }, + warning( _msg: string | object, _context: object ){}, + error( _msg: string | object, _context: object ){}, + critical( _msg: string | object, _context: object ){}, + alert( _msg: string | object, _context: object ){}, + emergency( _msg: string | object, _context: object ){}, + log( _level: any, _msg: string | object, _context: object ){}, + }; +} \ No newline at end of file diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts index 07867a8..eafc77d 100644 --- a/test/system/MetricsCollectorTest.ts +++ b/test/system/MetricsCollectorTest.ts @@ -19,13 +19,15 @@ * along with this program. If not, see . */ -import { PrometheusFactory } from '../../src/system/PrometheusFactory'; +import { + PrometheusFactory, + PrometheusConfig, +} from '../../src/system/PrometheusFactory'; import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; import { EventEmitter } from 'events'; import { expect } from 'chai'; import { MetricsCollector as Sut, - PrometheusConfig, MetricTimer, } from '../../src/system/MetricsCollector'; @@ -35,8 +37,8 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => { it( 'process-complete event is hooked', () => { - let histogram_called = false; - let counter_called = false; + let histogram_called = false; + let counter_called = false; const emitter = new EventEmitter(); const conf = createMockConfig(); @@ -46,18 +48,20 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => counter_cb: () => { counter_called = true }, } ); - new Sut( factory, conf, emitter, timer ); + const sut = new Sut( factory, conf, emitter, timer ); emitter.emit( 'delta-process-end' ); expect( histogram_called ).to.be.true; expect( counter_called ).to.be.true; + + sut.stop(); } ); it( 'process-error event is hooked', () => { - let counter_called = false; + let counter_called = false; const emitter = new EventEmitter(); const conf = createMockConfig(); @@ -66,11 +70,13 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => counter_cb: () => { counter_called = true }, } ); - new Sut( factory, conf, emitter, timer ); + const sut = new Sut( factory, conf, emitter, timer ); emitter.emit( 'delta-process-error' ); expect( counter_called ).to.be.true; + + sut.stop(); } ); @@ -80,7 +86,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => const uid = 'foo'; const start_time_ns = 1234; const end_time_ns = 5678; - const expected_ms = ( end_time_ns - start_time_ns ) / 1000; + const expected_ms = ( end_time_ns - start_time_ns ) / 1000000; const emitter = new EventEmitter(); const conf = createMockConfig(); const timer = createMockTimer( start_time_ns, end_time_ns ); @@ -88,12 +94,14 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => histogram_cb: ( n: number ) => { actual_ms = n }, } ); - new Sut( factory, conf, emitter, timer ); + const sut = new Sut( factory, conf, emitter, timer ); emitter.emit( 'delta-process-start', uid ); emitter.emit( 'delta-process-end', uid ); expect( actual_ms ).to.be.equal( expected_ms ); + + sut.stop(); } ); } ); diff --git a/test/system/StandardLoggerTest.ts b/test/system/StandardLoggerTest.ts new file mode 100644 index 0000000..918bfd1 --- /dev/null +++ b/test/system/StandardLoggerTest.ts @@ -0,0 +1,178 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { StandardLogger as Sut } from '../../src/system/StandardLogger'; +import { LogLevel } from '../../src/system/PsrLogger'; +import { expect } from 'chai'; + +const sinon = require( 'sinon' ); + +declare interface MockConsole extends Console { + getLevel(): string, + getStr(): string, +} + +describe( 'system.EventLogger captures and logs events', () => +{ + it( 'debug triggers console output level: info', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.debug( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'info' ); + } ); + + it( 'info triggers console output level: info', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.info( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'info' ); + } ); + + it( 'notice triggers console output level: log', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.notice( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'log' ); + } ); + + it( 'warning triggers console output level: warn', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.warning( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'warn' ); + } ); + + it( 'error triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.error( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'critical triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.critical( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'alert triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.alert( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'emergency triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.emergency( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'log triggers corresponding log level', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.log( LogLevel.ERROR, 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'Context is included in structured output', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + const context = { bar: 'baz' }; + const expected_output = { + message: 'Foo', + timestamp: 123123, + service: 'quote-server', + env: 'test', + severity: 'NOTICE', + context: { + bar: 'baz', + }, + }; + + sut.notice( 'Foo', context ); + + expect( con.getStr() ).to.deep.equal( expected_output ); + } ); +} ); + + +function ts_ctr(): UnixTimestamp +{ + return 123123; +} + + +function createMockConsole(): MockConsole +{ + const mock = sinon.mock( console ); + + mock.lvl = ''; + mock.str = ''; + mock.info = ( str: string ) => { mock.str = str; mock.lvl = 'info'; }; + mock.log = ( str: string ) => { mock.str = str; mock.lvl = 'log'; }; + mock.warn = ( str: string ) => { mock.str = str; mock.lvl = 'warn'; }; + mock.error = ( str: string ) => { mock.str = str; mock.lvl = 'error'; }; + mock.getLevel = () => mock.lvl; + mock.getStr = () => mock.str; + + return mock; +}