diff --git a/.env b/.env index acb0b79..0bf93e2 100644 --- a/.env +++ b/.env @@ -1,15 +1,15 @@ NODE_ENV=dev amqp_hostname=localhost amqp_port=5672 -amqp_username=quote_referral +amqp_username= amqp_password= amqp_frameMax=0 amqp_heartbeat=2 -amqp_vhost=quote -amqp_exchange=quoteupdate +amqp_vhost= +amqp_exchange= amqp_retries=30 amqp_retry_wait=1 -prom_hostname=dmz2docker01.rsgcorp.local +prom_hostname= prom_port=9091 prom_push_interval_ms=5000 process_interval_ms=2000 diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 40842de..522b98b 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -18,34 +18,43 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -import fs = require( 'fs' ); - -import { AmqpConfig } from '../src/system/AmqpPublisher'; +import { createAmqpConfig } from '../src/system/AmqpPublisher'; import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; import { DeltaProcessor } from '../src/system/DeltaProcessor'; import { DeltaPublisher } from '../src/system/DeltaPublisher'; -import { MongoDb, MongoDbConfig, MongoCollection } from '../src/types/mongodb'; -import { EventLogger } from '../src/system/EventLogger'; -import { EventEmitter } from 'events'; -import { PrometheusFactory } from '../src/system/PrometheusFactory'; +import { MongoCollection } from '../src/types/mongodb'; +import { createAvroEncoder } from '../src/system/avro/AvroFactory'; import { - MetricsCollector, - PrometheusConfig, -} from '../src/system/MetricsCollector'; + createMongoConfig, + createMongoDB, + getMongoCollection, +} from '../src/system/db/MongoFactory'; +import { EventMediator } from '../src/system/EventMediator'; +import { EventEmitter } from 'events'; +import { StandardLogger } from '../src/system/StandardLogger'; +import { MetricsCollector } from '../src/system/MetricsCollector'; +import { + PrometheusFactory, + createPrometheusConfig, +} from '../src/system/PrometheusFactory'; +import { AmqpConnection } from '../src/system/amqp/AmqpConnection'; -const { - Db: MongoDb, - Server: MongoServer, - ReplServers: ReplSetServers, -} = require( 'mongodb' ); -const amqp_conf = _getAmqpConfig( process.env ); -const db_conf = _getMongoConfig( process.env ); -const prom_conf = _getPrometheusConfig( process.env ); +const amqp_conf = createAmqpConfig( process.env ); +const prom_conf = createPrometheusConfig( process.env ); +const db_conf = createMongoConfig( process.env ); +const db = createMongoDB( db_conf ); +const process_interval_ms = +( process.env.process_interval_ms || 2000 ); const env = process.env.NODE_ENV || 'Unknown Environment'; -const process_interval_ms = +(process.env.process_interval_ms || 2000); const emitter = new EventEmitter(); -const db = _createDB( db_conf ); +const log = new StandardLogger( console, ts_ctr, env ); +const amqp_connection = new AmqpConnection( amqp_conf, emitter ); +const publisher = new DeltaPublisher( + emitter, + ts_ctr, + createAvroEncoder, + amqp_connection, +); // Prometheus Metrics const prom_factory = new PrometheusFactory(); @@ -57,67 +66,47 @@ const metrics = new MetricsCollector( ); // Structured logging -new EventLogger( console, env, emitter, ts_ctr ); +new EventMediator( log, emitter ); let process_interval: NodeJS.Timer; let dao: MongoDeltaDao; -let publisher: DeltaPublisher; -let processor: DeltaProcessor; -_getMongoCollection( db, db_conf ) +getMongoCollection( db, db_conf ) .then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } ) - .then( ( mongoDao: MongoDeltaDao ) => - { - dao = mongoDao; - publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); - processor = new DeltaProcessor( mongoDao, publisher, emitter ); - } ) - .then( _ => publisher.connect() ) + .then( ( mongoDao: MongoDeltaDao ) => { dao = mongoDao; } ) + .then( _ => amqp_connection.connect() ) .then( _ => - { - const pidPath = __dirname + '/../conf/.delta_processor.pid'; + { + log.info( 'Liza Delta Processor' ); - writePidFile(pidPath ); - greet( 'Liza Delta Processor', pidPath ); + handleShutdown(); - process_interval = setInterval( () => + const processor = new DeltaProcessor( dao, publisher, emitter ); + + process_interval = setInterval( () => { processor.process(); - metrics.checkForErrors( dao ); + + dao.getErrorCount() + .then( count => { metrics.updateErrorCount( count ) } ); }, process_interval_ms, ); } ) - .catch( e => { console.error( 'Error: ' + e ); } ); + .catch( e => + { + log.error( e ); + process.exit( 1 ); + } ); /** - * Output greeting - * - * The greeting contains the program name and PID file path. - * - * @param name - program name - * @param pid_path - path to PID file + * Hook shutdown events */ -function greet( name: string, pid_path: string ): void +function handleShutdown(): void { - console.log( `${name}`); - console.log( `PID file: ${pid_path}` ); -} - - -/** - * Write process id (PID) file - * - * @param pid_path - path to pid file - */ -function writePidFile( pid_path: string ): void -{ - fs.writeFileSync( pid_path, process.pid ); - process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } ) - .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } ) - .on( 'exit', () => { fs.unlink( pid_path, () => {} ); } ); + .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } ); } @@ -128,12 +117,12 @@ function writePidFile( pid_path: string ): void */ function shutdown( signal: string ): void { - console.log( 'Received ' + signal + '. Beginning graceful shutdown:' ); - console.log( '...Stopping processing interval' ); + log.info( 'Received ' + signal + '. Beginning graceful shutdown:' ); + log.info( '...Stopping processing interval' ); clearInterval( process_interval ); - console.log( '...Closing MongoDb connection' ); + log.info( '...Closing MongoDb connection' ); db.close( ( err, _data ) => { @@ -143,11 +132,15 @@ function shutdown( signal: string ): void } } ); - console.log( '...Closing AMQP connection...' ); + log.info( '...Closing AMQP connection...' ); - publisher.close(); + amqp_connection.close(); - console.log( 'Shutdown complete. Exiting.' ); + log.info( '...Stopping the metrics collector...' ); + + metrics.stop(); + + log.info( 'Shutdown complete. Exiting.' ); process.exit(); } @@ -161,179 +154,3 @@ function ts_ctr(): UnixTimestamp { return Math.floor( new Date().getTime() / 1000 ); } - - -/** - * Create the database connection - * - * @param conf - the configuration from the environment - * - * @return the mongodb connection - */ -function _createDB( conf: MongoDbConfig ): MongoDb -{ - if( conf.ha ) - { - var mongodbPort = conf.port || 27017; - var mongodbReplSet = conf.replset || 'rs0'; - var dbServers = new ReplSetServers( - [ - new MongoServer( conf.host_a, conf.port_a || mongodbPort), - new MongoServer( conf.host_b, conf.port_b || mongodbPort), - ], - {rs_name: mongodbReplSet, auto_reconnect: true} - ); - } - else - { - var dbServers = new MongoServer( - conf.host || '127.0.0.1', - conf.port || 27017, - {auto_reconnect: true} - ); - } - var db = new MongoDb( - 'program', - dbServers, - {native_parser: false, safe: false} - ); - return db; -} - - -/** - * Attempts to connect to the database - * - * connectError event will be emitted on failure. - * - * @return any errors that occurred - */ -function _getMongoCollection( - db: MongoDb, - conf: MongoDbConfig -): Promise -{ - return new Promise( ( resolve, reject ) => - { - // attempt to connect to the database - db.open( ( e: any, db: any ) => - { - // if there was an error, don't bother with anything else - if ( e ) - { - // in some circumstances, it may just be telling us that - // we're already connected (even though the connection may - // have been broken) - if ( e.errno !== undefined ) - { - reject( 'Error opening mongo connection: ' + e ); - return; - } - } else if ( db == null ) - { - reject( 'No database connection' ); - return; - } - - // quotes collection - db.collection( - conf.collection, - ( e: any, collection: MongoCollection ) => - { - if ( e ) - { - reject( 'Error creating collection: ' + e ); - return; - } - - // initialize indexes - collection.createIndex( - [ - ['published', 1], - ['deltaError', 1], - ], - true, - ( e: any, _index: { [P: string]: any } ) => - { - if ( e ) - { - reject( 'Error creating index: ' + e ); - return; - } - - resolve( collection ); - return; - } - ); - } - ); - } ); - } ); -} - - -/** - * Create a mongodb configuration from the environment - * - * @param env - the environment variables - * - * @return the mongo configuration - */ -function _getMongoConfig( env: any ): MongoDbConfig -{ - return { - 'port': +( env.MONGODB_PORT || 0 ), - 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1, - 'replset': env.LIZA_MONGODB_REPLSET, - 'host': env.MONGODB_HOST, - 'host_a': env.LIZA_MONGODB_HOST_A, - 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ), - 'host_b': env.LIZA_MONGODB_HOST_B, - 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ), - 'collection': 'quotes', - }; -} - - -/** - * Create an amqp configuration from the environment - * - * @param env - the environment variables - * - * @return the amqp configuration - */ -function _getAmqpConfig( env: any ): AmqpConfig -{ - return { - 'protocol': 'amqp', - 'hostname': env.amqp_hostname, - 'port': +( env.amqp_port || 0 ), - 'username': env.amqp_username, - 'password': env.amqp_password, - 'locale': 'en_US', - 'frameMax': env.amqp_frameMax, - 'heartbeat': env.amqp_heartbeat, - 'vhost': env.amqp_vhost, - 'exchange': env.amqp_exchange, - 'retries': env.amqp_retries || 30, - 'retry_wait': env.amqp_retry_wait || 1000, - }; -} - - -/** - * Create a prometheus configuration from the environment - * - * @param env - the environment variables - * - * @return the prometheus configuration - */ -function _getPrometheusConfig( env: any ): PrometheusConfig -{ - return { - 'hostname': env.prom_hostname, - 'port': +( env.prom_port || 0 ), - 'env': process.env.NODE_ENV, - 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ), - }; -} \ No newline at end of file diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index ef2ab77..b83a8e7 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -18,16 +18,21 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ +import { DocumentId } from '../document/Document'; + /** The data structure expected for a document's internal key/value store */ export type Kv = Record; + /** Possible delta values for Kv array indexes */ export type DeltaDatum = T | null | undefined; + /** Possible delta types */ export type DeltaType = 'ratedata' | 'data'; + /** * The constructor type for a delta generating function * @@ -46,6 +51,53 @@ export type DeltaConstructor = Kv, V extends Kv = export type DeltaResult = { [K in keyof T]: DeltaDatum | null }; +/** Complete delta type */ +export type Delta = { + type: DeltaType, + timestamp: UnixTimestamp, + data: DeltaResult, +} + + +/** Reverse delta type */ +export type ReverseDelta = { + data: Delta[], + ratedata: Delta[], +} + + +/** Structure for Published delta count */ +export type PublishDeltaCount = { + data?: number, + ratedata?: number, +} + + +/** + * Document structure + */ +export interface DeltaDocument +{ + /** The document id */ + id: DocumentId, + + /** The time the document was updated */ + lastUpdate: UnixTimestamp, + + /** The data bucket */ + data: Record, + + /** The rate data bucket */ + ratedata?: Record, + + /** The calculated reverse deltas */ + rdelta?: ReverseDelta, + + /** A count of how many of each delta type have been processed */ + totalPublishDelta?: PublishDeltaCount, +}; + + /** * Create delta to transform from src into dest * @@ -105,7 +157,7 @@ export function createDelta, V extends Kv>( * @param bucket - The bucket data * @param delta - The delta to apply * - * @return the delta + * @return the bucket with the delta applied */ export function applyDelta, V extends Kv>( bucket: U = {}, @@ -164,7 +216,7 @@ export function applyDelta, V extends Kv>( * @param bucket - The bucket data array * @param delta - The delta data array * - * @return an object with an changed flag and a data array + * @return the applied delta */ function _applyDeltaKey( bucket: T[], diff --git a/src/server/Server.js b/src/server/Server.js index 8651f9d..d839d7b 100644 --- a/src/server/Server.js +++ b/src/server/Server.js @@ -340,7 +340,6 @@ module.exports = Class( 'Server' ) .setImported( quote_data.importedInd || false ) .setBound( quote_data.boundInd || false ) .needsImport( quote_data.importDirty || false ) - .needsDeltaProcessing( quote_data.processed || true ) .setCurrentStepId( quote_data.currentStepId || quote_program.getFirstStepId() @@ -393,7 +392,6 @@ module.exports = Class( 'Server' ) importedInd: ( quote.isImported() ) ? 1 : 0, boundInd: ( quote.isBound() ) ? 1 : 0, importDirty: 0, - published: 1, syncInd: 0, boundInd: 0, notifyInd: 0, diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts index ef129b8..dd1df2d 100644 --- a/src/server/db/MongoServerDao.ts +++ b/src/server/db/MongoServerDao.ts @@ -20,7 +20,7 @@ */ import { ServerDao, Callback } from "./ServerDao"; -import { MongoCollection, MongoUpdate } from "mongodb"; +import { MongoCollection, MongoUpdate, MongoDb } from "mongodb"; import { PositiveInteger } from "../../numeric"; import { ServerSideQuote } from "../quote/ServerSideQuote"; import { QuoteId } from "../../document/Document"; @@ -64,7 +64,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao * @param {Mongo.Db} db mongo database connection */ constructor( - private readonly _db: any + private readonly _db: MongoDb ) { super(); @@ -86,7 +86,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao var dao = this; // map db error event (on connection error) to our connectError event - this._db.on( 'error', function( err: any ) + this._db.on( 'error', function( err: Error ) { dao._ready = false; dao._collection = null; @@ -165,7 +165,10 @@ export class MongoServerDao extends EventEmitter implements ServerDao collection.createIndex( [ ['id', 1] ], true, - function( _err: any, _index: { [P: string]: any } ) + function( + _err: NullableError, + _index: { [P: string]: any, + } ) { // mark the DAO as ready to be used dao._collection = collection; @@ -179,7 +182,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao db.collection( dao.COLLECTION_SEQ, function( - err: any, + err: Error, collection: MongoCollection, ) { if ( err ) @@ -199,7 +202,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao collection.find( { _id: dao.SEQ_QUOTE_ID }, { limit: 1 }, - function( err: any, cursor ) + function( err: NullableError, cursor ) { if ( err ) { @@ -207,7 +210,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao return; } - cursor.toArray( function( _err: any, data: any[] ) + cursor.toArray( function( _err: Error, data: any[] ) { if ( data.length == 0 ) { @@ -236,7 +239,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao _id: this.SEQ_QUOTE_ID, val: this.SEQ_QUOTE_ID_DEFAULT, }, - function( err: any, _docs: any ) + function( err: NullableError, _docs: any ) { if ( err ) { @@ -467,8 +470,8 @@ export class MongoServerDao extends EventEmitter implements ServerDao */ saveQuoteState( quote: ServerSideQuote, - success_callback: any, - failure_callback: any, + success_callback: Callback, + failure_callback: Callback, ) { var update = { @@ -486,8 +489,8 @@ export class MongoServerDao extends EventEmitter implements ServerDao saveQuoteClasses( quote: ServerSideQuote, classes: any, - success: any, - failure: any, + success: Callback, + failure: Callback, ) { return this.mergeData( diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index 12f521c..1f728b3 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -26,6 +26,32 @@ import { DocumentId } from '../document/Document'; import { Options } from 'amqplib'; +/** + * Create an amqp configuration from the environment + * + * @param env - the environment variables + * + * @return the amqp configuration + */ +export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig +{ + return { + protocol: 'amqp', + hostname: env.amqp_hostname, + port: +( env.amqp_port || 0 ), + username: env.amqp_username, + password: env.amqp_password, + locale: 'en_US', + frameMax: +( env.amqp_frameMax || 0 ), + heartbeat: +( env.amqp_heartbeat || 0 ), + vhost: env.amqp_vhost, + exchange: env.amqp_exchange, + retries: env.amqp_retries || 30, + retry_wait: env.amqp_retry_wait || 1000, + }; +} + + export interface AmqpConfig extends Options.Connect { /** The protocol to connect with (should always be "amqp") */ protocol: string; @@ -49,7 +75,7 @@ export interface AmqpConfig extends Options.Connect { frameMax: number; /** How often to check for a live connection */ - heartBeat: number; + heartbeat: number; /** The virtual host we are on (e.g. live, demo, test) */ vhost?: string; @@ -70,13 +96,15 @@ export interface AmqpPublisher /** * Publish quote message to exchange post-rating * - * @param delta - The delta - * @param bucket - The bucket - * @param doc_id - The doc_id + * @param doc_id - The doc_id + * @param delta - The delta + * @param bucket - The bucket + * @param ratedata - The rate data bucket */ publish( - delta: DeltaResult, - bucket: Record, - doc_id: DocumentId, + doc_id: DocumentId, + delta: DeltaResult, + bucket: Record, + ratedata?: Record, ): Promise } diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index d8da5cb..b136f80 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -20,10 +20,16 @@ */ import { DeltaDao } from "../system/db/DeltaDao"; -import { DeltaResult, DeltaType, applyDelta } from "../bucket/delta"; import { DocumentId } from "../document/Document"; import { AmqpPublisher } from "./AmqpPublisher"; import { EventEmitter } from "events"; +import { + DeltaType, + applyDelta, + DeltaDocument, + Delta, + ReverseDelta, +} from "../bucket/delta"; /** @@ -58,81 +64,84 @@ export class DeltaProcessor process(): Promise { return this._dao.getUnprocessedDocuments() - .then( docs => this._processNext( docs ) ) - .catch( err => { this._emitter.emit( 'dao-err', err ) } ); + .then( docs => this._processNext( docs ) ); } - private _processNext( docs: any ): Promise + private _processNext( docs: DeltaDocument[] ): Promise { - if ( docs.length === 0 ) + const doc = docs.shift(); + + if ( !doc ) { return Promise.resolve(); } - const doc = docs.shift(); - return this._processDocument( doc ) - .then( _ => this._processNext( docs ) ); + .then( _ => this._processNext( docs ) ) } - private _processDocument( doc: Record ): Promise + private _processDocument( doc: DeltaDocument ): Promise { - const deltas = this.getTimestampSortedDeltas( doc ); - const doc_id: DocumentId = doc.id; - const bucket = doc.data; - const last_updated_ts = doc.lastUpdate; + const deltas = this.getTimestampSortedDeltas( doc ); + const doc_id = doc.id; + const bucket = doc.data; + const ratedata = doc.ratedata; + const last_updated_ts = doc.lastUpdate; - return this._processNextDelta( deltas, bucket, doc_id ) + return this._processNextDelta( doc_id, deltas, bucket, ratedata ) .then( _ => this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) ) .then( _ => { - this._emitter.emit( - 'document-processed', - 'Deltas on document ' + doc_id + ' processed ' - + 'successfully. Document has been marked as ' - + 'completely processed.' - ); + this._emitter.emit( 'document-processed', { doc_id: doc_id } ); } ) .catch( e => { - this._emitter.emit( 'delta-err', e ); - this._dao.setErrorFlag( doc_id ); + this._emitter.emit( 'error', e ); + return this._dao.setErrorFlag( doc_id ); } ); } private _processNextDelta( - deltas: DeltaResult[], - bucket: Record, - doc_id: DocumentId, + doc_id: DocumentId, + deltas: Delta[], + bucket: Record, + ratedata?: Record, ): Promise { - if ( deltas.length === 0 ) - { - return Promise.resolve(); - } - const delta = deltas.shift(); if ( !delta ) { - return Promise.reject( new Error( 'Undefined delta' ) ); + return Promise.resolve(); } const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type; this._emitter.emit( 'delta-process-start', delta_uid ); - const new_bucket = applyDelta( bucket, delta.data ); + if ( delta.type == this.DELTA_DATA ) + { + bucket = applyDelta( bucket, delta.data ); + } + else + { + ratedata = applyDelta( ratedata, delta.data ); + } - return this._publisher.publish( delta, new_bucket, doc_id ) + return this._publisher.publish( doc_id, delta, bucket, ratedata ) .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) ) .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) - .then( _ => this._processNextDelta( deltas, new_bucket, doc_id ) ); + .then( _ => this._processNextDelta( + doc_id, + deltas, + bucket, + ratedata + ) ); } @@ -144,7 +153,7 @@ export class DeltaProcessor * * @return a list of deltas sorted by timestamp */ - getTimestampSortedDeltas( doc: any ): DeltaResult[] + getTimestampSortedDeltas( doc: DeltaDocument ): Delta[] { const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA ); const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA ); @@ -164,10 +173,10 @@ export class DeltaProcessor * * @return a trimmed list of deltas */ - getDeltas( doc: any, type: DeltaType ): DeltaResult[] + getDeltas( doc: DeltaDocument, type: DeltaType ): Delta[] { - const deltas_obj = doc.rdelta || {}; - const deltas: DeltaResult[] = deltas_obj[ type ] || []; + const deltas_obj = doc.rdelta || >{}; + const deltas: Delta[] = deltas_obj[ type ] || []; // Get type specific delta index let published_count = 0; @@ -197,7 +206,7 @@ export class DeltaProcessor * * @return a sort value */ - private _sortByTimestamp( a: DeltaResult, b: DeltaResult ): number + private _sortByTimestamp( a: Delta, b: Delta ): number { if ( a.timestamp < b.timestamp ) { diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index a5b68b8..94c925c 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -21,37 +21,22 @@ * Publish delta message to a queue */ -import { AmqpPublisher, AmqpConfig } from './AmqpPublisher'; -import { DeltaResult } from '../bucket/delta'; +import { AmqpPublisher } from './AmqpPublisher'; +import { Delta } from '../bucket/delta'; import { EventEmitter } from "events"; import { DocumentId } from '../document/Document'; import { context } from '../error/ContextError'; import { AmqpError } from '../error/AmqpError'; -import { - connect as amqpConnect, - Channel, - Connection, -} from 'amqplib'; +import { AvroSchema, AvroEncoderCtr } from './avro/AvroFactory'; +import { AmqpConnection } from './amqp/AmqpConnection'; + const avro = require( 'avro-js' ); - -export interface AvroSchema { - /** Write data to a buffer */ - toBuffer( data: Record ): Buffer | null; -} - - export class DeltaPublisher implements AmqpPublisher { - /** The amqp connection */ - private _conn?: Connection; - - /** The amqp channel */ - private _channel?: Channel; - /** The avro schema */ - private _type?: AvroSchema; + private _schema: AvroSchema; /** The path to the avro schema */ readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc'; @@ -66,122 +51,45 @@ export class DeltaPublisher implements AmqpPublisher /** * Delta publisher * - * @param _conf - amqp configuration - * @param _emitter - event emitter instance - * @param _ts_ctr - a timestamp constructor + * @param _emitter - event emitter instance + * @param _ts_ctr - a timestamp constructor + * @param _encoder_ctr - a factory function to create an avro encoder + * @param _conn - the amqp connection */ constructor( - private readonly _conf: AmqpConfig, - private readonly _emitter: EventEmitter, - private readonly _ts_ctr: () => UnixTimestamp, + private readonly _emitter: EventEmitter, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _encoder_ctr: AvroEncoderCtr, + private readonly _conn: AmqpConnection, ) { - this._type = avro.parse( this.SCHEMA_PATH ); - } - - - /** - * Initialize connection - */ - connect(): Promise - { - return amqpConnect( this._conf ) - .then( conn => - { - this._conn = conn; - - // If there is an error, attempt to reconnect - this._conn.on( 'error', e => - { - this._emitter.emit( 'amqp-conn-error', e ); - this._reconnect(); - } ); - - return this._conn.createChannel(); - } ) - .then( ( ch: Channel ) => - { - this._channel = ch; - - this._channel.assertExchange( - this._conf.exchange, - 'fanout', - { durable: true } - ); - } ); - } - - - /** - * Attempt to re-establish the connection - * - * @return Whether reconnecting was successful - */ - private _reconnect( retry_count: number = 0 ): void - { - if ( retry_count >= this._conf.retries ) - { - this._emitter.emit( - 'amqp-reconnect-fail', - 'Could not re-establish AMQP connection.' - ); - - return; - } - - this._emitter.emit( - 'amqp-reconnect', - '...attempting to re-establish AMQP connection' - ); - - this.connect() - .then( _ => - { - this._emitter.emit( - 'amqp-reconnect', - 'AMQP re-connected' - ); - } ) - .catch( _ => - { - const wait_ms = this._conf.retry_wait; - setTimeout( () => this._reconnect( ++retry_count ), wait_ms ); - } ); - } - - - /** - * Close the amqp conenction - */ - close(): void - { - if ( this._conn ) - { - this._conn.close.bind(this._conn); - } + this._schema = avro.parse( this.SCHEMA_PATH ); } /** * Publish quote message to exchange post-rating * - * @param delta - The delta - * @param bucket - The bucket - * @param doc_id - The doc_id + * @param doc_id - The doc_id + * @param delta - The delta + * @param bucket - The bucket + * @param ratedata - The ratedata bucket */ - publish( - delta: DeltaResult, - bucket: Record, - doc_id: DocumentId, + publish( + doc_id: DocumentId, + delta: Delta, + bucket: Record, + ratedata: Record = {}, ): Promise { - return this.sendMessage( delta, bucket, doc_id ) + return this._sendMessage( doc_id, delta, bucket, ratedata ) .then( _ => { this._emitter.emit( 'delta-publish', - "Published " + delta.type + " delta with ts '" - + delta.timestamp + "' to '" + this._conf.exchange - + '" exchange', + { + delta: delta, + exchange: this._conn.getExchangeName(), + } ); } ); } @@ -190,131 +98,155 @@ export class DeltaPublisher implements AmqpPublisher /** * Send message to exchange * - * @param delta - The delta to publish - * @param bucket - The bucket - * @param doc_id - The doc_id + * @param doc_id - The doc_id + * @param delta - The delta to publish + * @param bucket - The bucket + * @param ratedata - The ratedata bucket * * @return whether publish was successful */ - sendMessage( - delta: DeltaResult, - bucket: Record, - doc_id: DocumentId, + private _sendMessage( + doc_id: DocumentId, + delta: Delta, + bucket: Record, + ratedata: Record, ): Promise { - return new Promise( ( resolve, reject ) => - { - const ts = this._ts_ctr(); - const headers = { version: 1, created: ts }; - const avro_object = this.avroFormat( delta, bucket, doc_id, ts ); - const avro_buffer = this.avroEncode( avro_object ); + const ts = this._ts_ctr(); + const headers = { version: 1, created: ts }; + const avro_object = this._avroFormat( + ts, + doc_id, + delta, + bucket, + ratedata, + ); - if ( !this._conn ) + return this.avroEncode( avro_object ) + .then( ( avro_buffer ) => { - reject( context ( - new AmqpError( 'Error sending message: No connection' ), - { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, - }, - ) ); - return; - } - else if ( !this._channel ) - { - reject( context ( - new AmqpError( 'Error sending message: No channel' ), - { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, - }, - ) ); - return; - } - else if ( !avro_buffer ) - { - reject( context ( - new Error( 'Error sending message: No avro buffer' ), - { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, - }, - ) ); - return; - } + const channel = this._conn.getAmqpChannel(); - // we don't use a routing key; fanout exchange - const published_successfully = this._channel.publish( - this._conf.exchange, - '', - avro_buffer, - { headers: headers }, - ); - - if ( published_successfully ) - { - resolve(); - return; - } - - reject( context( - new Error ( 'Error sending message: publishing failed' ), + if ( !channel ) { - doc_id: doc_id, - delta_type: delta.type, - delta_ts: delta.ts, + return Promise.reject( context ( + new AmqpError( 'Error sending message: No channel' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.timestamp, + }, + ) ); } - ) ); - } ); + + // we don't use a routing key; fanout exchange + const published_successfully = channel.publish( + this._conn.getExchangeName(), + '', + avro_buffer, + { headers: headers }, + ); + + if ( !published_successfully ) + { + return Promise.reject( context( + new Error ( 'Delta publish failed' ), + { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.timestamp, + } + ) ); + } + + return Promise.resolve(); + } ); } - avroFormat( - delta: DeltaResult, - _bucket: Record, - doc_id: DocumentId, - ts: UnixTimestamp, + /** + * Throw an error with specific information if the schema is invalid + * + * @param schema - Avro schema + * @param data - Data to encode + */ + private _assertValidAvro( + schema: AvroSchema, + data: Record, + ): void + { + schema.isValid( data, { errorHook: hook } ); + + function hook( keys: any, vals: any) { + throw context( new Error( 'Invalid Avro Schema' ), + { + invalid_paths: keys, + invalid_data: vals, + } + ); + } + } + + + /** + * Format the avro data with data type labels + * + * @param ts - a timestamp + * @param doc_id - the document id + * @param delta - the current delta + * @param bucket - the data bucket + * @param ratedata - the ratedata bucket + * + * @return the formatted data + */ + private _avroFormat( + ts: UnixTimestamp, + doc_id: DocumentId, + delta: Delta, + bucket: Record, + ratedata: Record, ): any { - const delta_data = this.setDataTypes( delta.data ); - const event_id = this.DELTA_MAP[ delta.type ]; + const delta_formatted = this.setDataTypes( delta.data ); + const bucket_formatted = this.setDataTypes( bucket ); + const ratedata_formatted = this.setDataTypes( ratedata ); + const event_id = this.DELTA_MAP[ delta.type ]; return { event: { - id: event_id, - ts: ts, + id: event_id, + ts: ts, actor: 'SERVER', - step: null, + step: null, }, document: { - id: doc_id - }, - session: { - entity_name: 'Foobar', // Fix - entity_id: 123123, // Fix + id: doc_id }, data: { Data: { - bucket: _bucket, + bucket: bucket_formatted, + }, + }, + ratedata: { + Data: { + bucket: ratedata_formatted, }, }, delta: { Data: { - bucket: delta_data, + bucket: delta_formatted, }, }, program: { Program: { id: 'quote_server', - version: 'dadaddwafdwa', // Fix + version: '', }, }, } } + /** * Encode the data in an avro buffer * @@ -322,33 +254,28 @@ export class DeltaPublisher implements AmqpPublisher * * @return the avro buffer or null if there is an error */ - avroEncode( data: Record ): Buffer | null + avroEncode( data: Record ): Promise { - let buffer = null; - - try + return new Promise( ( resolve, reject ) => { - if ( !this._type ) + const bufs: Buffer[] = []; + + try { - this._emitter.emit( - 'avro-err', - 'No avro scheama found', - ); + this._assertValidAvro( this._schema, data ) - return null; + const encoder = this._encoder_ctr( this._schema ) + + encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) + encoder.on('error', ( err: Error ) => { reject( err ); } ) + encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } ) + encoder.end( data ); } - - buffer = this._type.toBuffer( data ); - } - catch( e ) - { - this._emitter.emit( - 'avro-err', - 'Error encoding data to avro: ' + e, - ); - } - - return buffer; + catch ( e ) + { + reject( e ); + } + } ); } @@ -365,7 +292,7 @@ export class DeltaPublisher implements AmqpPublisher switch( typeof( data ) ) { - case 'object': // Typescript treats arrays as objects + case 'object': if ( data == null ) { return null; diff --git a/src/system/EventLogger.ts b/src/system/EventLogger.ts deleted file mode 100644 index ef46aca..0000000 --- a/src/system/EventLogger.ts +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Event logger - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of liza. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * PSR-12 style logger based on node events - */ - -import { EventEmitter } from "events"; - -enum LogLevel { - DEBUG, - INFO, - NOTICE, - WARNING, - ERROR, - CRITICAL, - ALERT, - EMERGENCY, -}; - -declare type StructuredLog = { - message: string; - timestamp: UnixTimestamp; - service: string; - env: string; - severity: string; -} - -export class EventLogger -{ - /** - * Initialize logger - * - * @param _env - The environment ( dev, test, demo, live ) - * @param _emitter - An event emitter - * @param _ts_ctr - a timestamp constructor - */ - constructor( - private readonly _console: Console, - private readonly _env: string, - private readonly _emitter: EventEmitter, - private readonly _ts_ctr: () => UnixTimestamp, - ) { - this.init(); - } - - - /** - * Initialize the logger to look for specific events - */ - init(): void - { - this._registerEvent( 'document-processed', LogLevel.NOTICE ); - this._registerEvent( 'delta-publish', LogLevel.NOTICE ); - this._registerEvent( 'amqp-conn-error', LogLevel.WARNING ); - this._registerEvent( 'amqp-reconnect', LogLevel.WARNING ); - this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR ); - this._registerEvent( 'avro-err', LogLevel.ERROR ); - this._registerEvent( 'dao-err', LogLevel.ERROR ); - this._registerEvent( 'publish-err', LogLevel.ERROR ); - - // this._registerEvent( 'log', LogLevel.INFO ); - // this._registerEvent( 'debug', LogLevel.DEBUG ); - // this._registerEvent( 'info', LogLevel.INFO ); - // this._registerEvent( 'notice', LogLevel.NOTICE ); - // this._registerEvent( 'warning', LogLevel.WARNING ); - // this._registerEvent( 'error', LogLevel.ERROR ); - // this._registerEvent( 'critical', LogLevel.CRITICAL ); - // this._registerEvent( 'alert', LogLevel.ALERT ); - // this._registerEvent( 'emergency', LogLevel.EMERGENCY ); - } - - - /** - * Register an event at a specific log level - * - * @param event_id - the event id - * @param level - the log level - */ - private _registerEvent( event_id: string, level: LogLevel ): void - { - const logF = this._getLogLevelFunction( level ) - - this._emitter.on( event_id, logF ); - } - - - /** - * Get a logging function for the specified log level - * - * @param event_id - the event id - * - * @return the function to log with - */ - private _getLogLevelFunction( level: LogLevel ): ( str: string ) => void - { - switch( level ) - { - case LogLevel.DEBUG: - case LogLevel.INFO: - return ( str ) => this._console.info( this._format( str, level ) ); - case LogLevel.NOTICE: - return ( str ) => this._console.log( this._format( str, level ) ); - case LogLevel.WARNING: - return ( str ) => this._console.warn( this._format( str, level ) ); - case LogLevel.ERROR: - case LogLevel.CRITICAL: - case LogLevel.ALERT: - case LogLevel.EMERGENCY: - return ( str ) => this._console.error( this._format( str, level ) ); - default: - return ( str ) => this._console.log( "UNKNOWN LOG LEVEL: " + str ); - } - } - - - /** - * Get structured log object - * - * @param str - the string to log - * @param level - the log level - * - * @returns a structured logging object - */ - private _format( str: string, level: LogLevel ): StructuredLog - { - return { - message: str, - timestamp: this._ts_ctr(), - service: 'quote-server', - env: this._env, - severity: LogLevel[level], - }; - } -} diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts new file mode 100644 index 0000000..31b245e --- /dev/null +++ b/src/system/EventMediator.ts @@ -0,0 +1,88 @@ +/** + * Event Meditator + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Hook events and log them + */ + +import { EventEmitter } from 'events'; +import { PsrLogger } from './PsrLogger'; +import { hasContext } from '../error/ContextError'; + +export class EventMediator +{ + /** + * Initialize mediator + * + * @param _log - A PSR-3 style logger + * @param _emitter - An event emitter + */ + constructor( + private readonly _log: PsrLogger, + private readonly _emitter: EventEmitter, + ) { + this._emitter.on( 'delta-publish', ( msg ) => this._log.notice( + 'Published delta to exchange', + msg + ) ); + + this._emitter.on( 'document-processed', ( msg ) => this._log.notice( + 'Deltas on document processed successfully. Document has been ' + + 'marked as completely processed.', + msg + ) ); + + this._emitter.on( 'amqp-conn-error', ( msg ) => + this._log.warning( 'AMQP Connection Error', msg ) ); + + this._emitter.on( 'amqp-reconnect', () => + this._log.warning( + '...attempting to re-establish AMQP connection' + ) + ); + + this._emitter.on( 'amqp-reconnected', () => + this._log.warning( + 'AMQP re-connected' + ) + ); + + this._emitter.on( 'error', ( arg ) => + this._handleError( arg ) ); + } + + + private _handleError( e: any ): void + { + let msg: string = ''; + let context = {}; + + if ( e instanceof( Error ) ) + { + msg = e.message; + + if ( hasContext( e ) ) + { + context = e.context; + } + } + + this._log.error( msg, context ); + } +} diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index 30c4ea2..bc4a6cc 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -21,27 +21,12 @@ * Collect Metrics for Prometheus */ -import { DeltaDao } from "./db/DeltaDao"; import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; import { EventEmitter } from "events"; -import { PrometheusFactory } from './PrometheusFactory'; +import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory'; const client = require( 'prom-client' ) -export declare type PrometheusConfig = { - /** The hostname to connect to */ - hostname: string; - - /** The port to connect to */ - port: number; - - /** The environment ( dev, test, demo, live ) */ - env: string; - - /** The rate (in milliseconds) at which metrics are pushed */ - push_interval_ms: number; -} - export type MetricTimer = ( _start_time?: [ number, number ] @@ -78,6 +63,8 @@ export class MetricsCollector /** Timing map */ private _timing_map: Record = {}; + private _push_interval: NodeJS.Timer; + /** * Initialize delta logger @@ -133,8 +120,7 @@ export class MetricsCollector ); // Push metrics on a specific interval - setInterval( - () => + this._push_interval = setInterval( () => { this._gateway.pushAdd( { jobName: 'liza_delta_metrics' }, this.pushCallback @@ -147,6 +133,15 @@ export class MetricsCollector } + /** + * Stop the push interval + */ + stop(): void + { + clearInterval( this._push_interval ); + } + + /** * List to events to update metrics */ @@ -154,10 +149,7 @@ export class MetricsCollector { this._emitter.on( 'delta-process-start', - ( uid: string ) => - { - this._timing_map[ uid ] = this._timer(); - } + ( uid: string ) => { this._timing_map[ uid ] = this._timer(); } ); this._emitter.on( @@ -166,7 +158,7 @@ export class MetricsCollector { const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ]; const t = this._timer( start_time_ms ); - const total_time_ms = ( ( t[ 0 ] * 1000 ) + ( t[ 1 ] / 1000 ) ); + const total_time_ms = t[ 0 ] * 1000 + t[ 1 ] / 1000000; this._process_time.observe( total_time_ms ); this._total_processed.inc(); @@ -188,27 +180,23 @@ export class MetricsCollector * @param body - The resposne body */ private pushCallback( - _error?: Error | undefined, + error?: Error | undefined, _response?: any, _body?: any ): void { - console.log( 'Push callback' ); - console.error( _error ); + if ( error ) + { + this._emitter.emit( 'error', error ); + } } /** - * Look for mongodb delta errors and update metrics if found - * - * @return any errors the occurred + * Update metrics with current error count */ - checkForErrors( dao: DeltaDao ): NullableError + updateErrorCount( count: number ): void { - dao.getErrorCount() - .then( count => { this._current_error.set( +count ); } ) - .catch( err => { return err; } ); - - return null; + this._current_error.set( +count ); } } diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts index 088612e..0cf059f 100644 --- a/src/system/PrometheusFactory.ts +++ b/src/system/PrometheusFactory.ts @@ -22,6 +22,42 @@ */ import { Pushgateway, Histogram, Counter, Gauge } from 'prom-client'; + +export declare type PrometheusConfig = { + /** The hostname to connect to */ + hostname: string; + + /** The port to connect to */ + port: number; + + /** The environment ( dev, test, demo, live ) */ + env: string; + + /** The rate (in milliseconds) at which metrics are pushed */ + push_interval_ms: number; +} + + +/** + * Create a prometheus configuration from the environment + * + * @param env - the environment variables + * + * @return the prometheus configuration + */ +export function createPrometheusConfig( + env: NodeJS.ProcessEnv +): PrometheusConfig +{ + return { + 'hostname': env.prom_hostname, + 'port': +( env.prom_port || 0 ), + 'env': process.env.NODE_ENV, + 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ), + }; +} + + export class PrometheusFactory { /** diff --git a/src/system/PsrLogger.ts b/src/system/PsrLogger.ts new file mode 100644 index 0000000..276c78e --- /dev/null +++ b/src/system/PsrLogger.ts @@ -0,0 +1,117 @@ +/** + * PSR logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * PSR-3 style logger + */ + +export enum LogLevel { + DEBUG, + INFO, + NOTICE, + WARNING, + ERROR, + CRITICAL, + ALERT, + EMERGENCY, +}; + + +export interface PsrLogger +{ + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + debug( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + info( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + notice( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + warning( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + error( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + critical( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + alert( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + emergency( msg: string | object, context?: object ): void + + + /** + * Log a message + * + * @param msg - the message to log + * @param context - additional message context + */ + log( level: LogLevel, msg: string | object, context?: object ): void +} diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts new file mode 100644 index 0000000..d69c3d4 --- /dev/null +++ b/src/system/StandardLogger.ts @@ -0,0 +1,198 @@ +/** + * Stdout logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Standard out logger implementing PSR-3 standards + */ +import { PsrLogger, LogLevel } from './PsrLogger'; + +declare type StructuredLog = { + message: string; + timestamp: UnixTimestamp; + service: string; + env: string; + severity: string; + context?: Record; +} + +export class StandardLogger implements PsrLogger +{ + /** + * Initialize logger + * + * @param _console + * @param _ts_ctr - a timestamp constructor + * @param _env - The environment ( dev, test, demo, live ) + */ + constructor( + private readonly _console: Console, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _env: string, + ) {} + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + debug( msg: string | object, context?: object ): void + { + this._console.info( this._format( LogLevel.DEBUG, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + info( msg: string | object, context?: object ): void + { + this._console.info( this._format( LogLevel.INFO, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + notice( msg: string | object, context?: object ): void + { + this._console.log( this._format( LogLevel.NOTICE, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + warning( msg: string | object, context?: object ): void + { + this._console.warn( this._format( LogLevel.WARNING, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + error( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.ERROR, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + critical( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.CRITICAL, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + alert( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.ALERT, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + emergency( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.EMERGENCY, msg, context ) ); + } + + + /** + * Log a message + * + * @param msg - the message to log + * @param context - additional message context + */ + log( level: LogLevel, msg: string | object, context?: object ): void + { + this._console.error( this._format( level, msg, context ) ); + } + + + /** + * Get structured log object + * + * @param msg - the string or object to log + * @param level - the log level + * + * @returns a structured logging object + */ + private _format( + level: LogLevel, + msg: string | object, + context: object = {}, + ): StructuredLog + { + let str: string; + + if ( msg !== null && typeof( msg ) === 'object' ) + { + str = JSON.stringify( msg ); + } + else + { + str = msg; + } + + const structured_log = { + message: str, + timestamp: this._ts_ctr(), + service: 'quote-server', + env: this._env, + severity: LogLevel[level], + }; + + if ( Object.keys( context ).length > 0 ) + { + structured_log[ "context" ] = context; + } + + return structured_log; + } +} diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts new file mode 100644 index 0000000..13b9791 --- /dev/null +++ b/src/system/amqp/AmqpConnection.ts @@ -0,0 +1,154 @@ +/** + * Amqp Connection + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Amqp Connection + */ +import { AmqpConfig } from '../AmqpPublisher'; +import { EventEmitter } from "events"; +import { + connect as AmqpConnect, + Channel, + Connection, +} from 'amqplib'; + + +export class AmqpConnection +{ + /** The amqp connection */ + private _conn?: Connection; + + /** The amqp channel */ + private _channel?: Channel; + + + /** + * Amqp Connection + * + * @param _conf - amqp configuration + * @param _emitter - event emitter instance + */ + constructor( + private readonly _conf: AmqpConfig, + private readonly _emitter: EventEmitter, + ) {} + + + /** + * Initialize connection + */ + connect(): Promise + { + return AmqpConnect( this._conf ) + .then( conn => + { + this._conn = conn; + + /** If there is an error, attempt to reconnect + * Only hook this once because it will be re-hooked on each + * successive successful connection + */ + this._conn.once( 'error', e => + { + this._emitter.emit( 'amqp-conn-error', e ); + this._reconnect(); + } ); + + return this._conn.createChannel(); + } ) + .then( ( ch: Channel ) => + { + this._channel = ch; + + this._channel.assertExchange( + this._conf.exchange, + 'fanout', + { durable: true } + ); + } ); + } + + + /** + * Attempt to re-establish the connection + * + * @param retry_count - the number of retries attempted + */ + private _reconnect( retry_count: number = 0 ): void + { + if ( retry_count >= this._conf.retries ) + { + this._emitter.emit( + 'error', + new Error( 'Could not re-establish AMQP connection.' ) + ); + + return; + } + + this._emitter.emit( 'amqp-reconnect' ); + + this.connect() + .then( _ => { this._emitter.emit( 'amqp-reconnected' ) } ) + .catch( _ => + { + const wait_ms = this._conf.retry_wait; + setTimeout( () => this._reconnect( ++retry_count ), wait_ms ); + } ); + } + + + /** + * Returns the exchange to publish to + * + * @return exchange name + */ + getExchangeName(): string + { + return this._conf.exchange; + } + + + /** + * Returns the amqp channel + * + * @return exchange name + */ + getAmqpChannel(): Channel | undefined + { + if ( !this._channel ) + { + this._reconnect(); + } + + return this._channel; + } + + + /** + * Close the amqp conenction + */ + close(): void + { + if ( this._conn ) + { + this._conn.close.bind(this._conn); + } + } +} \ No newline at end of file diff --git a/src/system/avro/AvroFactory.ts b/src/system/avro/AvroFactory.ts new file mode 100644 index 0000000..0a07a32 --- /dev/null +++ b/src/system/avro/AvroFactory.ts @@ -0,0 +1,90 @@ +/** + * Factory functions for avro + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +import { Duplex } from 'stream'; + +const avro = require( 'avro-js' ); + + +export interface AvroSchema +{ + /** + * Write data to a buffer + * + * @param data - the data to write + * + * @return the buffer if successful + */ + toBuffer( data: Record ): Buffer | null; + + + /** + * Check if data is valid against schema + * + * @param data - the data to validate + * @param opts - options specified as key/values + * + * @return the buffer if it is valid + */ + isValid( + data: Record, + opts?: Record + ): Buffer | null; + + + /** + * Write to a buffer + * + * @param data - the data to write + * @param buffer - the buffer that will be written to + */ + encode( data: Record, buffer: Buffer ): void; + + + /** + * Output to a json string + * + * @param data - the data to format + * + * @return the formatted data + */ + toString( data: Record ): string; + + + /** + * Deserialize from a buffer + * + * @param buffer - the buffer to read from + * + * @return the resulting data + */ + fromBuffer( buffer: Buffer ): any; +} + + +/** The avro encoder constructor type */ +export type AvroEncoderCtr = ( type: AvroSchema ) => Duplex; + + +/** The avro encoder constructor */ +export function createAvroEncoder( schema: AvroSchema ): Duplex +{ + return new avro.streams.BlockEncoder( schema ); +} \ No newline at end of file diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc index 4a9a609..53e4a9d 100644 --- a/src/system/avro/schema.avsc +++ b/src/system/avro/schema.avsc @@ -77,23 +77,6 @@ ] } }, - { - "name": "session", - "type": { - "type": "record", - "name": "Session", - "fields": [ - { - "name": "entity_name", - "type": "string" - }, - { - "name": "entity_id", - "type": "int" - } - ] - } - }, { "name": "data", "type": [ @@ -160,6 +143,13 @@ } ] }, + { + "name": "ratedata", + "type": [ + "null", + "Data" + ] + }, { "name": "delta", "type": [ @@ -191,4 +181,3 @@ } ] } - diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index d7d2544..881bc79 100644 --- a/src/system/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -28,6 +28,7 @@ */ import { DocumentId } from "../../document/Document"; +import { DeltaDocument } from "../../bucket/delta"; /** Manage deltas */ @@ -38,7 +39,7 @@ export interface DeltaDao * * @return documents in need of processing */ - getUnprocessedDocuments(): Promise[]> + getUnprocessedDocuments(): Promise /** @@ -46,8 +47,6 @@ export interface DeltaDao * * @param doc_id - Document whose index will be set * @param type - Delta type - * - * @return any errors that occurred */ advanceDeltaIndex( doc_id: DocumentId, @@ -61,8 +60,6 @@ export interface DeltaDao * * @param doc_id - The document to mark * @param last_update_ts - The last time this document was updated - * - * @return any errors that occurred */ markDocumentAsProcessed( doc_id: DocumentId, diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index 4ab4e91..8d45684 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -26,7 +26,7 @@ import { DeltaDao } from './DeltaDao'; import { MongoCollection } from 'mongodb'; import { context } from '../../error/ContextError'; import { DaoError } from '../../error/DaoError'; -import { DeltaType } from '../../bucket/delta'; +import { DeltaType, DeltaDocument } from '../../bucket/delta'; /** Manage deltas */ export class MongoDeltaDao implements DeltaDao @@ -37,11 +37,21 @@ export class MongoDeltaDao implements DeltaDao /** The data delta type */ static readonly DELTA_DATA: string = 'data'; + /** The document fields to read */ + readonly RESULT_FIELDS: Record = { + id: 1, + lastUpdate: 1, + data: 1, + ratedata: 1, + rdelta: 1, + totalPublishDelta: 1, + }; + /** * Initialize connection * - * @param _db Mongo db + * @param _collection - Mongo db collection */ constructor( private readonly _collection: MongoCollection, @@ -53,7 +63,7 @@ export class MongoDeltaDao implements DeltaDao * * @return documents in need of processing */ - getUnprocessedDocuments(): Promise[]> + getUnprocessedDocuments(): Promise { return new Promise( ( resolve, reject ) => { @@ -62,7 +72,7 @@ export class MongoDeltaDao implements DeltaDao published: false, deltaError: false, }, - {}, + { fields: this.RESULT_FIELDS }, ( e, cursor ) => { if ( e ) @@ -75,7 +85,7 @@ export class MongoDeltaDao implements DeltaDao return } - cursor.toArray( ( e: Error, data: any[] ) => + cursor.toArray( ( e: Error, data: DeltaDocument[] ) => { if ( e ) { @@ -100,13 +110,8 @@ export class MongoDeltaDao implements DeltaDao * * @param doc_id - Document whose index will be set * @param type - Delta type - * - * @return any errors that occurred */ - advanceDeltaIndex( - doc_id: DocumentId, - type: DeltaType, - ): Promise + advanceDeltaIndex( doc_id: DocumentId, type: DeltaType ): Promise { return new Promise( ( resolve, reject ) => { @@ -123,9 +128,7 @@ export class MongoDeltaDao implements DeltaDao if ( e ) { reject( context( - new DaoError( - 'Error advancing delta index: ' + e - ), + new DaoError( 'Error advancing delta index: ' + e ), { doc_id: doc_id, type: type, @@ -149,8 +152,6 @@ export class MongoDeltaDao implements DeltaDao * * @param doc_id - The document to mark * @param last_update_ts - The last time this document was updated - * - * @return any errors that occurred */ markDocumentAsProcessed( doc_id: DocumentId, diff --git a/src/system/db/MongoFactory.ts b/src/system/db/MongoFactory.ts new file mode 100644 index 0000000..5a3b03e --- /dev/null +++ b/src/system/db/MongoFactory.ts @@ -0,0 +1,176 @@ +/** + * Mongo Factory functions + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * These definitions are for a very old mongodb library, which will be + * once we get around to updating node. Quite a failure on the maintenance + * front. + * + * instantiate objects for MongoDb + */ +import { MongoDb, MongoDbConfig, MongoCollection } from '../../types/mongodb'; +import { DaoError } from '../../error/DaoError'; + + +const { + Db: MongoDb, + Server: MongoServer, + ReplServers: ReplSetServers, +} = require( 'mongodb' ); + + +/** + * Create a mongodb configuration from the environment + * + * @param env - the environment variables + * + * @return the mongo configuration + */ +export function createMongoConfig( env: NodeJS.ProcessEnv ): MongoDbConfig +{ + return { + 'port': +( env.MONGODB_PORT || 0 ), + 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1, + 'replset': env.LIZA_MONGODB_REPLSET, + 'host': env.MONGODB_HOST, + 'host_a': env.LIZA_MONGODB_HOST_A, + 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ), + 'host_b': env.LIZA_MONGODB_HOST_B, + 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ), + 'collection': 'quotes', + }; +} + + +/** + * Create the database connection + * + * @param conf - the configuration from the environment + * + * @return the mongodb connection + */ +export function createMongoDB( conf: MongoDbConfig ): MongoDb +{ + if( conf.ha ) + { + var mongodbPort = conf.port || 27017; + var mongodbReplSet = conf.replset || 'rs0'; + var dbServers = new ReplSetServers( + [ + new MongoServer( conf.host_a, conf.port_a || mongodbPort), + new MongoServer( conf.host_b, conf.port_b || mongodbPort), + ], + {rs_name: mongodbReplSet, auto_reconnect: true} + ); + } + else + { + var dbServers = new MongoServer( + conf.host || '127.0.0.1', + conf.port || 27017, + {auto_reconnect: true} + ); + } + var db = new MongoDb( + 'program', + dbServers, + {native_parser: false, safe: false} + ); + return db; +} + + +/** + * Attempts to connect to the database and retrieve the collection + * + * connectError event will be emitted on failure. + * + * @param db - the mongo database + * @param conf - the mongo configuration + * + * @return the collection + */ +export function getMongoCollection( + db: MongoDb, + conf: MongoDbConfig +): Promise +{ + return new Promise( ( resolve, reject ) => + { + // attempt to connect to the database + db.open( ( e: any, db: any ) => + { + // if there was an error, don't bother with anything else + if ( e ) + { + // in some circumstances, it may just be telling us that + // we're already connected (even though the connection may + // have been broken) + if ( e.errno !== undefined ) + { + reject( new Error( + 'Error opening mongo connection: ' + e + ) ); + return; + } + } else if ( db == null ) + { + reject( new DaoError( 'No database connection' ) ); + return; + } + + // quotes collection + db.collection( + conf.collection, + ( e: any, collection: MongoCollection ) => + { + if ( e ) + { + reject( new DaoError( + 'Error creating collection: ' + e + ) ); + return; + } + + // initialize indexes + collection.createIndex( + [ + ['published', 1], + ['deltaError', 1], + ], + true, + ( e: any, _index: { [P: string]: any } ) => + { + if ( e ) + { + reject( new DaoError( + 'Error creating index: ' + e + ) ); + return; + } + + resolve( collection ); + return; + } + ); + } + ); + } ); + } ); +} \ No newline at end of file diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts index 4642223..bbbfa46 100644 --- a/src/types/mongodb.d.ts +++ b/src/types/mongodb.d.ts @@ -62,6 +62,15 @@ export interface MongoDb * @param callback continuation on completion */ close( callback: MongoCallback ): void; + + + /** + * Hook events + * + * @param event_id - the event to hook + * @param callback - a function to call in response to the event + */ + on( event_id: string, callback: ( err: Error ) => void ): void; } @@ -107,6 +116,9 @@ interface MongoFindOptions /** Whether to project only id's */ id?: number, + + /** Which fields to include in the result set */ + fields?: Record, } @@ -236,6 +248,10 @@ declare interface MongoCollection /** * Creates an index on the collection + * + * @param fieldOrSpec - indexes to create + * @param options - mongo options + * @param callback - continuation on completion */ createIndex( fieldOrSpec: MongoIndexSpecification, @@ -246,6 +262,9 @@ declare interface MongoCollection /** * Creates an index on the collection + * + * @param docs - documents to insert + * @param callback - continuation on completion */ insert( docs: MongoInsertSpecification, diff --git a/test/server/db/MongoServerDaoTest.js b/test/server/db/MongoServerDaoTest.js deleted file mode 100644 index d6c8bf1..0000000 --- a/test/server/db/MongoServerDaoTest.js +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Tests MongoServerDao - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of the Liza Data Collection Framework. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -'use strict'; -Object.defineProperty(exports, "__esModule", { value: true }); -var MongoServerDao_1 = require("../../../src/server/db/MongoServerDao"); -var chai_1 = require("chai"); -chai_1.use(require('chai-as-promised')); -describe('MongoServerDao', function () { - describe('#saveQuote', function () { - describe("with no save data or push data", function () { - it("saves entire metabucket record individually", function (done) { - var metadata = { - foo: ['bar', 'baz'], - bar: [{ quux: 'quuux' }], - }; - var quote = createStubQuote(metadata); - var sut = new MongoServerDao_1.MongoServerDao(createMockDb( - // update - function (_selector, data) { - chai_1.expect(data.$set['meta.foo']) - .to.deep.equal(metadata.foo); - chai_1.expect(data.$set['meta.bar']) - .to.deep.equal(metadata.bar); - chai_1.expect(data.$push).to.equal(undefined); - done(); - })); - sut.init(function () { - return sut.saveQuote(quote, function () { }, function () { }); - }); - }); - }); - describe("with push data", function () { - it("adds push data to the collection", function (done) { - var push_data = { - foo: ['bar', 'baz'], - bar: [{ quux: 'quuux' }], - }; - var quote = createStubQuote({}); - var sut = new MongoServerDao_1.MongoServerDao(createMockDb( - // update - function (_selector, data) { - chai_1.expect(data.$push['foo']) - .to.deep.equal(push_data.foo); - chai_1.expect(data.$push['bar']) - .to.deep.equal(push_data.bar); - done(); - })); - sut.init(function () { - return sut.saveQuote(quote, function () { }, function () { }, undefined, push_data); - }); - }); - it("skips push data when it is an empty object", function (done) { - var push_data = {}; - var quote = createStubQuote({}); - var sut = new MongoServerDao_1.MongoServerDao(createMockDb( - // update - function (_selector, data) { - chai_1.expect(data.$push).to.equal(undefined); - done(); - })); - sut.init(function () { - return sut.saveQuote(quote, function () { }, function () { }, undefined, push_data); - }); - }); - }); - }); -}); -function createMockDb(on_update) { - var collection_quotes = { - update: on_update, - createIndex: function (_, __, c) { return c(); }, - }; - var collection_seq = { - find: function (_, __, c) { - c(null, { - toArray: function (c) { return c(null, { length: 5 }); }, - }); - }, - }; - var db = { - collection: function (id, c) { - var coll = (id === 'quotes') - ? collection_quotes - : collection_seq; - c(null, coll); - }, - }; - var driver = { - open: function (c) { return c(null, db); }, - on: function () { }, - }; - return driver; -} -function createStubQuote(metadata) { - var program = { - getId: function () { return '1'; }, - ineligibleLockCount: 0, - apis: {}, - internal: {}, - meta: { - arefs: {}, - fields: {}, - groups: {}, - qdata: {}, - qtypes: {}, - }, - mapis: {}, - initQuote: function () { }, - }; - var quote = { - getBucket: function () { return ({ - getData: function () { }, - }); }, - getMetabucket: function () { return ({ - getData: function () { return metadata; }, - }); }, - getId: function () { return 123; }, - getProgramVersion: function () { return 'Foo'; }, - getLastPremiumDate: function () { return 0; }, - getRatedDate: function () { return 0; }, - getExplicitLockReason: function () { return ""; }, - getExplicitLockStep: function () { return 1; }, - isImported: function () { return false; }, - isBound: function () { return false; }, - getTopVisitedStepId: function () { return 1; }, - getTopSavedStepId: function () { return 1; }, - setRatedDate: function () { return quote; }, - setRateBucket: function () { return quote; }, - setRatingData: function () { return quote; }, - getRatingData: function () { return ({ _unavailable_all: '0' }); }, - getProgram: function () { return program; }, - setExplicitLock: function () { return quote; }, - getProgramId: function () { return 'Foo'; }, - getCurrentStepId: function () { return 0; }, - setLastPremiumDate: function () { return quote; }, - }; - return quote; -} -//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiTW9uZ29TZXJ2ZXJEYW9UZXN0LmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiTW9uZ29TZXJ2ZXJEYW9UZXN0LnRzIl0sIm5hbWVzIjpbXSwibWFwcGluZ3MiOiJBQUFBOzs7Ozs7Ozs7Ozs7Ozs7Ozs7O0dBbUJHO0FBRUgsWUFBWSxDQUFDOztBQUViLHdFQUE4RTtBQUU5RSw2QkFBK0M7QUFRL0MsVUFBUSxDQUFFLE9BQU8sQ0FBRSxrQkFBa0IsQ0FBRSxDQUFFLENBQUM7QUFHMUMsUUFBUSxDQUFFLGdCQUFnQixFQUFFO0lBRXhCLFFBQVEsQ0FBRSxZQUFZLEVBQUU7UUFFcEIsUUFBUSxDQUFFLGdDQUFnQyxFQUFFO1lBRXhDLEVBQUUsQ0FBRSw2Q0FBNkMsRUFBRSxVQUFBLElBQUk7Z0JBRW5ELElBQU0sUUFBUSxHQUFHO29CQUNiLEdBQUcsRUFBRSxDQUFFLEtBQUssRUFBRSxLQUFLLENBQUU7b0JBQ3JCLEdBQUcsRUFBRSxDQUFFLEVBQUUsSUFBSSxFQUFFLE9BQU8sRUFBRSxDQUFFO2lCQUM3QixDQUFDO2dCQUVGLElBQU0sS0FBSyxHQUFHLGVBQWUsQ0FBRSxRQUFRLENBQUUsQ0FBQztnQkFFMUMsSUFBTSxHQUFHLEdBQUcsSUFBSSwrQkFBRyxDQUFFLFlBQVk7Z0JBQzdCLFNBQVM7Z0JBQ1QsVUFBRSxTQUF3QixFQUFFLElBQWlCO29CQUV6QyxhQUFNLENBQUUsSUFBSSxDQUFDLElBQUksQ0FBRSxVQUFVLENBQUUsQ0FBRTt5QkFDNUIsRUFBRSxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBRSxDQUFDO29CQUVuQyxhQUFNLENBQUUsSUFBSSxDQUFDLElBQUksQ0FBRSxVQUFVLENBQUUsQ0FBRTt5QkFDNUIsRUFBRSxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBRSxDQUFDO29CQUduQyxhQUFNLENBQUUsSUFBSSxDQUFDLEtBQUssQ0FBRSxDQUFDLEVBQUUsQ0FBQyxLQUFLLENBQUUsU0FBUyxDQUFFLENBQUM7b0JBRTNDLElBQUksRUFBRSxDQUFDO2dCQUNYLENBQUMsQ0FDSixDQUFFLENBQUM7Z0JBRUosR0FBRyxDQUFDLElBQUksQ0FBRTtvQkFDTixPQUFBLEdBQUcsQ0FBQyxTQUFTLENBQUUsS0FBSyxFQUFFLGNBQU8sQ0FBQyxFQUFFLGNBQU8sQ0FBQyxDQUFFO2dCQUExQyxDQUEwQyxDQUM3QyxDQUFDO1lBQ04sQ0FBQyxDQUFFLENBQUM7UUFDUixDQUFDLENBQUUsQ0FBQztRQUVKLFFBQVEsQ0FBRSxnQkFBZ0IsRUFBRTtZQUV4QixFQUFFLENBQUUsa0NBQWtDLEVBQUUsVUFBQSxJQUFJO2dCQUV4QyxJQUFNLFNBQVMsR0FBRztvQkFDZCxHQUFHLEVBQUUsQ0FBRSxLQUFLLEVBQUUsS0FBSyxDQUFFO29CQUNyQixHQUFHLEVBQUUsQ0FBRSxFQUFFLElBQUksRUFBRSxPQUFPLEVBQUUsQ0FBRTtpQkFDN0IsQ0FBQztnQkFFRixJQUFNLEtBQUssR0FBRyxlQUFlLENBQUUsRUFBRSxDQUFFLENBQUM7Z0JBRXBDLElBQU0sR0FBRyxHQUFHLElBQUksK0JBQUcsQ0FBRSxZQUFZO2dCQUM3QixTQUFTO2dCQUNULFVBQUMsU0FBd0IsRUFBRSxJQUFpQjtvQkFFeEMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsS0FBSyxDQUFFLENBQUU7eUJBQ3hCLEVBQUUsQ0FBQyxJQUFJLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBQyxHQUFHLENBQUUsQ0FBQztvQkFFcEMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsS0FBSyxDQUFFLENBQUU7eUJBQ3hCLEVBQUUsQ0FBQyxJQUFJLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBQyxHQUFHLENBQUUsQ0FBQztvQkFFcEMsSUFBSSxFQUFFLENBQUM7Z0JBQ1gsQ0FBQyxDQUNKLENBQUUsQ0FBQztnQkFFSixHQUFHLENBQUMsSUFBSSxDQUFFO29CQUNOLE9BQUEsR0FBRyxDQUFDLFNBQVMsQ0FDVCxLQUFLLEVBQ0wsY0FBTyxDQUFDLEVBQ1IsY0FBTyxDQUFDLEVBQ1IsU0FBUyxFQUNULFNBQVMsQ0FDWjtnQkFORCxDQU1DLENBQ0osQ0FBQztZQUNOLENBQUMsQ0FBRSxDQUFDO1lBRUosRUFBRSxDQUFFLDRDQUE0QyxFQUFFLFVBQUEsSUFBSTtnQkFFbEQsSUFBTSxTQUFTLEdBQUcsRUFBRSxDQUFDO2dCQUVyQixJQUFNLEtBQUssR0FBRyxlQUFlLENBQUUsRUFBRSxDQUFFLENBQUM7Z0JBRXBDLElBQU0sR0FBRyxHQUFHLElBQUksK0JBQUcsQ0FBRSxZQUFZO2dCQUM3QixTQUFTO2dCQUNULFVBQUUsU0FBd0IsRUFBRSxJQUFpQjtvQkFFekMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsQ0FBQyxFQUFFLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBRSxDQUFDO29CQUUzQyxJQUFJLEVBQUUsQ0FBQztnQkFDWCxDQUFDLENBQ0osQ0FBRSxDQUFDO2dCQUVKLEdBQUcsQ0FBQyxJQUFJLENBQUU7b0JBQ04sT0FBQSxHQUFHLENBQUMsU0FBUyxDQUNULEtBQUssRUFDTCxjQUFPLENBQUMsRUFDUixjQUFPLENBQUMsRUFDUixTQUFTLEVBQ1QsU0FBUyxDQUNaO2dCQU5ELENBTUMsQ0FDSixDQUFDO1lBQ04sQ0FBQyxDQUFFLENBQUM7UUFDUixDQUFDLENBQUUsQ0FBQztJQUNSLENBQUMsQ0FBRSxDQUFDO0FBQ1IsQ0FBQyxDQUFFLENBQUM7QUFHSixTQUFTLFlBQVksQ0FBRSxTQUFjO0lBRWpDLElBQU0saUJBQWlCLEdBQUc7UUFDdEIsTUFBTSxFQUFFLFNBQVM7UUFDakIsV0FBVyxFQUFFLFVBQUUsQ0FBTSxFQUFFLEVBQU8sRUFBRSxDQUFNLElBQU0sT0FBQSxDQUFDLEVBQUUsRUFBSCxDQUFHO0tBQ2xELENBQUM7SUFFRixJQUFNLGNBQWMsR0FBRztRQUNuQixJQUFJLEVBQUosVUFBTSxDQUFNLEVBQUUsRUFBTyxFQUFFLENBQU07WUFFekIsQ0FBQyxDQUFFLElBQUksRUFBRTtnQkFDTCxPQUFPLEVBQUUsVUFBRSxDQUFNLElBQU0sT0FBQSxDQUFDLENBQUUsSUFBSSxFQUFFLEVBQUUsTUFBTSxFQUFFLENBQUMsRUFBRSxDQUFFLEVBQXhCLENBQXdCO2FBQ2xELENBQUUsQ0FBQztRQUNSLENBQUM7S0FDSixDQUFDO0lBRUYsSUFBTSxFQUFFLEdBQUc7UUFDUCxVQUFVLEVBQVYsVUFBWSxFQUFPLEVBQUUsQ0FBTTtZQUV2QixJQUFNLElBQUksR0FBRyxDQUFFLEVBQUUsS0FBSyxRQUFRLENBQUU7Z0JBQzVCLENBQUMsQ0FBQyxpQkFBaUI7Z0JBQ25CLENBQUMsQ0FBQyxjQUFjLENBQUM7WUFFckIsQ0FBQyxDQUFFLElBQUksRUFBRSxJQUFJLENBQUUsQ0FBQztRQUNwQixDQUFDO0tBQ0osQ0FBQztJQUVGLElBQU0sTUFBTSxHQUFHO1FBQ1gsSUFBSSxFQUFFLFVBQUUsQ0FBTSxJQUFNLE9BQUEsQ0FBQyxDQUFFLElBQUksRUFBRSxFQUFFLENBQUUsRUFBYixDQUFhO1FBQ2pDLEVBQUUsRUFBSSxjQUFPLENBQUM7S0FDakIsQ0FBQztJQUVGLE9BQU8sTUFBTSxDQUFDO0FBQ2xCLENBQUM7QUFHRCxTQUFTLGVBQWUsQ0FBRSxRQUE2QjtJQUVuRCxJQUFNLE9BQU8sR0FBWTtRQUNyQixLQUFLLEVBQWdCLGNBQU0sT0FBQSxHQUFHLEVBQUgsQ0FBRztRQUM5QixtQkFBbUIsRUFBRSxDQUFDO1FBQ3RCLElBQUksRUFBaUIsRUFBRTtRQUN2QixRQUFRLEVBQWEsRUFBRTtRQUN2QixJQUFJLEVBQWlCO1lBQ2pCLEtBQUssRUFBRyxFQUFFO1lBQ1YsTUFBTSxFQUFFLEVBQUU7WUFDVixNQUFNLEVBQUUsRUFBRTtZQUNWLEtBQUssRUFBRyxFQUFFO1lBQ1YsTUFBTSxFQUFFLEVBQUU7U0FDYjtRQUNELEtBQUssRUFBZ0IsRUFBRTtRQUN2QixTQUFTLEVBQVksY0FBTyxDQUFDO0tBQ2hDLENBQUM7SUFFRixJQUFNLEtBQUssR0FBb0I7UUFDM0IsU0FBUyxFQUFFLGNBQU0sT0FBaUIsQ0FBRTtZQUNoQyxPQUFPLEVBQUUsY0FBTyxDQUFDO1NBQ3BCLENBQUUsRUFGYyxDQUVkO1FBRUgsYUFBYSxFQUFFLGNBQU0sT0FBaUIsQ0FBRTtZQUNwQyxPQUFPLEVBQUUsY0FBTSxPQUFBLFFBQVEsRUFBUixDQUFRO1NBQzFCLENBQUUsRUFGa0IsQ0FFbEI7UUFFSCxLQUFLLEVBQWtCLGNBQU0sT0FBUyxHQUFHLEVBQVosQ0FBWTtRQUN6QyxpQkFBaUIsRUFBTSxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7UUFDbEMsa0JBQWtCLEVBQUssY0FBTSxPQUFlLENBQUMsRUFBaEIsQ0FBZ0I7UUFDN0MsWUFBWSxFQUFXLGNBQU0sT0FBZSxDQUFDLEVBQWhCLENBQWdCO1FBQzdDLHFCQUFxQixFQUFFLGNBQU0sT0FBQSxFQUFFLEVBQUYsQ0FBRTtRQUMvQixtQkFBbUIsRUFBSSxjQUFNLE9BQWlCLENBQUMsRUFBbEIsQ0FBa0I7UUFDL0MsVUFBVSxFQUFhLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxPQUFPLEVBQWdCLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxtQkFBbUIsRUFBSSxjQUFNLE9BQWlCLENBQUMsRUFBbEIsQ0FBa0I7UUFDL0MsaUJBQWlCLEVBQU0sY0FBTSxPQUFpQixDQUFDLEVBQWxCLENBQWtCO1FBQy9DLFlBQVksRUFBVyxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7UUFDbEMsYUFBYSxFQUFVLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxhQUFhLEVBQVUsY0FBTSxPQUFBLEtBQUssRUFBTCxDQUFLO1FBQ2xDLGFBQWEsRUFBVSxjQUFNLE9BQUEsQ0FBWSxFQUFFLGdCQUFnQixFQUFFLEdBQUcsRUFBRSxDQUFBLEVBQXJDLENBQXFDO1FBQ2xFLFVBQVUsRUFBYSxjQUFNLE9BQUEsT0FBTyxFQUFQLENBQU87UUFDcEMsZUFBZSxFQUFRLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxZQUFZLEVBQVcsY0FBTSxPQUFBLEtBQUssRUFBTCxDQUFLO1FBQ2xDLGdCQUFnQixFQUFPLGNBQU0sT0FBQSxDQUFDLEVBQUQsQ0FBQztRQUM5QixrQkFBa0IsRUFBSyxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7S0FDckMsQ0FBQztJQUVGLE9BQU8sS0FBSyxDQUFDO0FBQ2pCLENBQUMifQ== \ No newline at end of file diff --git a/test/server/db/MongoServerDaoTest.ts b/test/server/db/MongoServerDaoTest.ts index b0a83bb..58e6ab9 100644 --- a/test/server/db/MongoServerDaoTest.ts +++ b/test/server/db/MongoServerDaoTest.ts @@ -22,7 +22,7 @@ 'use strict'; import { MongoServerDao as Sut } from "../../../src/server/db/MongoServerDao"; -import { MongoSelector, MongoUpdate } from "mongodb"; +import { MongoSelector, MongoUpdate, MongoDb } from "mongodb"; import { expect, use as chai_use } from 'chai'; import { ServerSideQuote } from "../../../src/server/quote/ServerSideQuote"; import { PositiveInteger } from "../../../src/numeric"; @@ -139,7 +139,7 @@ describe( 'MongoServerDao', () => } ); -function createMockDb( on_update: any ) +function createMockDb( on_update: any ): MongoDb { const collection_quotes = { update: on_update, @@ -166,8 +166,9 @@ function createMockDb( on_update: any ) }, }; - const driver = { + const driver = { open: ( c: any ) => c( null, db ), + close: () => {}, on: () => {}, }; diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index d0cd8f8..3998d68 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -22,7 +22,8 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; -import { DeltaType } from "../../src/bucket/delta"; +import { DeltaType, DeltaDocument } from "../../src/bucket/delta"; +import { DocumentId } from '../../src/document/Document'; import { EventEmitter } from 'events'; import { expect, use as chai_use } from 'chai'; @@ -308,11 +309,12 @@ describe( 'system.DeltaProcessor', () => }[]>[ { label: 'No deltas are processed', - docs: [ + given: [ { id: 123, lastUpdate: 123123123, - bucket: {}, + data: {}, + ratedata: {}, rdelta: {}, }, ], @@ -324,7 +326,8 @@ describe( 'system.DeltaProcessor', () => { id: 123, lastUpdate: 123123123, - bucket: { foo: [ 'start_bar' ] }, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, rdelta: { data: [ { @@ -341,14 +344,16 @@ describe( 'system.DeltaProcessor', () => ], expected: [ { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'second_bar' ] }, + bucket: { foo: [ 'second_bar' ] }, + ratedata: {}, }, ], }, @@ -358,17 +363,18 @@ describe( 'system.DeltaProcessor', () => { id: 123, lastUpdate: 123123123, - bucket: { foo: 'start_bar' }, + data: { foo: [ 'start_bar_123' ] }, + ratedata: {}, rdelta: { data: [ { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ 'second_bar_123' ] }, timestamp: 234, }, ], ratedata: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ 'first_bar_123' ] }, timestamp: 123, }, ], @@ -377,19 +383,20 @@ describe( 'system.DeltaProcessor', () => { id: 234, lastUpdate: 123123123, - bucket: { foo: 'start_bar' }, + data: { foo: [ 'start_bar_234' ] }, + ratedata: {}, rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ 'first_bar_234' ] }, timestamp: 123, }, { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ 'second_bar_234' ] }, timestamp: 234, }, { - data: { foo: [ 'third_bar' ] }, + data: { foo: [ 'third_bar_234' ] }, timestamp: 345, }, ], @@ -398,15 +405,16 @@ describe( 'system.DeltaProcessor', () => { id: 345, lastUpdate: 123123123, - bucket: { foo: 'start_bar' }, + data: { foo: [ 'start_bar_345' ] }, + ratedata: {}, rdelta: { ratedata: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ 'first_bar_345' ] }, timestamp: 123, }, { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ 'second_bar_345' ] }, timestamp: 234, }, ], @@ -415,60 +423,73 @@ describe( 'system.DeltaProcessor', () => ], expected: [ { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'first_bar_123' ] }, + bucket: { foo: [ 'start_bar_123' ] }, + ratedata: { foo: [ 'first_bar_123' ] }, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 123, + doc_id: 123, + delta: { foo: [ 'second_bar_123' ] }, + bucket: { foo: [ 'second_bar_123' ] }, + ratedata: { foo: [ 'first_bar_123' ] }, }, { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 234, + doc_id: 234, + delta: { foo: [ 'first_bar_234' ] }, + bucket: { foo: [ 'first_bar_234' ] }, + ratedata: {}, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 234, + doc_id: 234, + delta: { foo: [ 'second_bar_234' ] }, + bucket: { foo: [ 'second_bar_234' ] }, + ratedata: {}, }, { - delta: { foo: [ 'third_bar' ] }, - bucket: { foo: [ 'third_bar' ] }, - doc_id: 234, + doc_id: 234, + delta: { foo: [ 'third_bar_234' ] }, + bucket: { foo: [ 'third_bar_234' ] }, + ratedata: {}, }, { - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, - doc_id: 345, + doc_id: 345, + delta: { foo: [ 'first_bar_345' ] }, + bucket: { foo: [ 'start_bar_345' ] }, + ratedata: { foo: [ 'first_bar_345' ] }, }, { - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, - doc_id: 345, + doc_id: 345, + delta: { foo: [ 'second_bar_345' ] }, + bucket: { foo: [ 'start_bar_345' ] }, + ratedata: { foo: [ 'second_bar_345' ] }, }, ], }, - ] ).forEach( ( { given, expected, label } ) => it( label, () => + ] ).forEach( ( { label, given, expected } ) => it( label, () => { let published: any = []; const dao = createMockDeltaDao(); const publisher = createMockDeltaPublisher(); const emitter = new EventEmitter(); - dao.getUnprocessedDocuments = (): Promise[]> => + dao.getUnprocessedDocuments = (): Promise => { return Promise.resolve( given ); } - publisher.publish = ( delta, bucket, doc_id ): Promise => + publisher.publish = ( + doc_id, + delta, + bucket, + ratedata, + ): Promise => { published.push( { - delta: delta.data, - bucket: bucket, - doc_id: doc_id, + doc_id: doc_id, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, } ); return Promise.resolve(); @@ -479,6 +500,203 @@ describe( 'system.DeltaProcessor', () => .then( _ => expect( published ).to.deep.equal( expected ) ); } ) ); } ); + + + describe( 'Error handling', () => + { + it( 'Marks document in error state and continues', () => + { + let published: any = []; + let error_flag_set = false; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + const doc = [ { + id: 123, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + }, + { + id: 234, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + } ]; + + const expected_published = [ + { + doc_id: 123, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, + }, + { + doc_id: 234, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, + } + ]; + + const expected_error = 'Uh oh'; + + dao.getUnprocessedDocuments = (): Promise => + Promise.resolve( doc ); + + dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise => + Promise.reject( new Error( expected_error ) ); + + dao.setErrorFlag = (): Promise => + { + error_flag_set = true; + return Promise.resolve(); + } + + publisher.publish = ( + doc_id, + delta, + bucket, + ratedata, + ): Promise => + { + published.push( { + doc_id: doc_id, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + // Prevent node from converting an error event into an error + emitter.on( 'error', () => {} ); + + return expect( new Sut( dao, publisher, emitter ).process() ) + .to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( error_flag_set ).to.be.true; + expect( published ).to.deep.equal( expected_published ); + } ); + } ); + } ); + + + describe( 'Error handling', () => + { + it( 'Failure to set document error state further processing', () => + { + let published: any = []; + let caught_error = ''; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + const doc = [ { + id: 123, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + }, + { + id: 234, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + } + ], + ratedata: [], + }, + } ]; + + // Only one is published + const expected_published = [ { + doc_id: 123, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'first_bar' ] }, + ratedata: {}, + } ]; + + const expected_error = 'Uh oh'; + + dao.getUnprocessedDocuments = (): Promise => + Promise.resolve( doc ); + + dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise => + Promise.reject( new Error( 'Couldn\'t mark document' ) ); + + dao.setErrorFlag = (): Promise => + Promise.reject( new Error( expected_error ) ); + + publisher.publish = ( + doc_id, + delta, + bucket, + ratedata, + ): Promise => + { + published.push( { + doc_id: doc_id, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + // Prevent node from converting an error event into an error + emitter.on( 'error', () => {} ); + + return expect( + new Sut( dao, publisher, emitter ).process() + .catch( e => { caught_error = e.message } ) + ) + .to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( caught_error ).to.equal( expected_error ); + expect( published ).to.deep.equal( expected_published ); + } ); + } ); + } ); } ); diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index 40663d0..fcb788c 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -19,13 +19,25 @@ * along with this program. If not, see . */ +import { AmqpConnection } from '../../src/system/amqp/AmqpConnection'; +import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; -import { AmqpConfig } from '../../src/system/AmqpPublisher'; +import { DocumentId } from '../../src/document/Document'; +import { Duplex } from 'stream'; import { EventEmitter } from "events"; +import { hasContext } from '../../src/error/ContextError'; +import { AmqpError } from '../../src/error/AmqpError'; +import { Channel } from 'amqplib'; +import { + createAvroEncoder, + AvroEncoderCtr, + AvroSchema, +} from '../../src/system/avro/AvroFactory'; import { expect, use as chai_use } from 'chai'; chai_use( require( 'chai-as-promised' ) ); +const sinon = require( 'sinon' ); describe( 'server.DeltaPublisher', () => { @@ -33,24 +45,96 @@ describe( 'server.DeltaPublisher', () => { it( 'sends a message', () => { - const conf = createMockConf(); - const emitter = new EventEmitter(); + let publish_called = false; + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + conn.getAmqpChannel = () => + { + return { + publish: ( _: any, __: any, buf: any, ___: any ) => + { + expect( buf instanceof Buffer ).to.be.true; - console.log( new Sut( conf, emitter, ts_ctr ) ); - expect( true ).to.be.true + publish_called = true; + + return true; + } + }; + }; + + const sut = new Sut( emitter, ts_ctr, createAvroEncoder, conn ); + + return expect( + sut.publish( 123, delta, bucket, ratedata ) + ).to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( publish_called ).to.be.true; + } ); } ); - } ); - describe( '#sendMessage', () => - { - it( 'sends a message', () => + ( <[string, () => Channel | undefined, Error, string ][]>[ + [ + 'Throws an error when publishing was unsuccessful', + () => + { + return { + publish: ( _: any, __: any, _buf: any, ___: any ) => + { + return false; + } + }; + }, + Error, + 'Delta publish failed' + ], + [ + 'Throws an error when no amqp channel is found', + () => + { + return undefined; + }, + AmqpError, + 'Error sending message: No channel' + ] + ] ).forEach( ( [ label, getChannelF, error_type, err_msg ] ) => + it( label, () => { - const conf = createMockConf(); - const emitter = new EventEmitter(); + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const doc_id = 123; + const expected = { + doc_id: doc_id, + delta_type: delta.type, + delta_ts: delta.timestamp + } - console.log( new Sut( conf, emitter, ts_ctr ) ); - expect( true ).to.be.true - } ); + conn.getAmqpChannel = getChannelF; + + const result = new Sut( emitter, ts_ctr, createAvroEncoder, conn ) + .publish( doc_id, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( + error_type, err_msg + ), + result.catch( e => + { + if ( !hasContext( e ) ) + { + return expect.fail(); + } + + return expect( e.context ).to.deep.equal( expected ); + } ) + ] ); + } ) ); } ); describe( '#avroEncode parses', () => @@ -137,32 +221,26 @@ describe( 'server.DeltaPublisher', () => { it( label, () => { - let errorCalled = false; + const emitter = createMockEventEmitter(); + const conn = createMockAmqpConnection(); + const data = createMockData( delta_data ); + const sut = new Sut( + emitter, + ts_ctr, + createAvroEncoder, + conn, + ); - const emitter = { - emit( _event_id, _err ) + sut.avroEncode( data ) + .then( b => { - errorCalled = true; - - console.log( 'server.DeltaPublisher.Error' + _err ); - } - } - - const conf = createMockConf(); - const data = createMockData( delta_data ); - const sut = new Sut( conf, emitter, ts_ctr ); - const buffer = sut.avroEncode( data ); - - if ( valid ) - { - expect( typeof(buffer) ).to.equal( 'object' ); - } - else - { - expect( buffer ).to.equal( null ); - } - - expect( valid ).to.equal( !errorCalled ); + expect( typeof(b) ).to.equal( 'object' ); + expect( valid ).to.be.true; + } ) + .catch( _ => + { + expect( valid ).to.be.false; + } ); } ); } ); } ); @@ -301,9 +379,16 @@ describe( 'server.DeltaPublisher', () => { it( label, () => { - const emitter = {} - const conf = createMockConf(); - const sut = new Sut( conf, emitter, ts_ctr ); + const encoded = 'FooBar'; + const emitter = createMockEventEmitter(); + const conn = createMockAmqpConnection(); + const avroEncoderCtr = createMockEncoder( encoded ); + const sut = new Sut( + emitter, + ts_ctr, + avroEncoderCtr, + conn, + ); const actual = sut.setDataTypes( delta_data ); expect( actual ).to.deep.equal( expected ); @@ -312,14 +397,39 @@ describe( 'server.DeltaPublisher', () => } ); } ); + function ts_ctr(): UnixTimestamp { return Math.floor( new Date().getTime() / 1000 ); } -function createMockConf(): AmqpConfig + +function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr { - return {}; + return ( _schema: AvroSchema ) => + { + const mock = sinon.mock( Duplex ); + + mock.on = ( _: string, __: any ) => {}; + mock.end = ( _: any ) => { return mock_encoded_data; }; + + return mock; + }; +} + + +function createMockEventEmitter(): EventEmitter +{ + return {}; +} + + +function createMockAmqpConnection(): AmqpConnection +{ + return { + connect: () => {}, + getExchangeName: () => { 'Foo' }, + }; } @@ -339,11 +449,8 @@ function createMockData( delta_data: any ): any modified: 1573856916, top_visited_step: '2', }, - session: { - entity_name: 'Foobar', - entity_id: 123123 , - }, - data: null, + data: null, + ratedata: null, delta: { Data: { bucket: delta_data, @@ -356,4 +463,22 @@ function createMockData( delta_data: any ): any }, }, }; +} + + +function createMockBucketData(): Record +{ + return { + foo: [ 'bar', 'baz' ] + } +} + + +function createMockDelta(): Delta +{ + return >{ + type: 'data', + timestamp: 123123123, + data: >{}, + } } \ No newline at end of file diff --git a/test/system/EventLoggerTest.ts b/test/system/EventLoggerTest.ts deleted file mode 100644 index b3d5f0f..0000000 --- a/test/system/EventLoggerTest.ts +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Event logger test - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of the Liza Data Collection Framework. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -import { EventLogger as Sut } from '../../src/system/EventLogger'; -import { EventEmitter } from "events"; -import { expect } from 'chai'; - -const sinon = require( 'sinon' ); - -declare interface MockConsole extends Console { - getLevel(): string, -} - -describe( 'system.EventLogger captures and logs events', () => -{ - [ - { - event_id: 'document-processed', - console_level: 'log', - }, - { - event_id: 'delta-publish', - console_level: 'log', - }, - { - event_id: 'amqp-conn-error', - console_level: 'warn', - }, - { - event_id: 'amqp-reconnect', - console_level: 'warn', - }, - { - event_id: 'amqp-reconnect-fail', - console_level: 'error', - }, - { - event_id: 'avro-err', - console_level: 'error', - }, - { - event_id: 'dao-err', - console_level: 'error', - }, - { - event_id: 'publish-err', - console_level: 'error', - }, - ].forEach( ( { event_id, console_level } ) => - { - it( event_id + ' triggers console output level: ' + console_level, () => - { - const emitter = new EventEmitter(); - const con = createMockConsole(); - const env = 'test'; - - new Sut( con, env, emitter, ts_ctr ); - - emitter.emit( event_id ); - - expect( con.getLevel() ).to.equal( console_level ); - } ); - } ); -} ); - - -function ts_ctr(): UnixTimestamp -{ - return Math.floor( new Date().getTime() / 1000 ); -} - - -function createMockConsole(): MockConsole -{ - const mock = sinon.mock( console ); - - mock.level = ''; - mock.info = ( _str: string ) => { mock.level = 'info'; }; - mock.log = ( _str: string ) => { mock.level = 'log'; }; - mock.warn = ( _str: string ) => { mock.level = 'warn'; }; - mock.error = ( _str: string ) => { mock.level = 'error'; }; - mock.getLevel = () => mock.level; - - return mock; -} \ No newline at end of file diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts new file mode 100644 index 0000000..581437c --- /dev/null +++ b/test/system/EventMediatorTest.ts @@ -0,0 +1,139 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { EventMediator as Sut } from '../../src/system/EventMediator'; +import { context } from '../../src/error/ContextError'; +import { EventEmitter } from "events"; +import { expect } from 'chai'; +import { PsrLogger } from '../../src/system/PsrLogger'; + + +describe( 'system.EventLogger captures and logs events', () => +{ + it( 'document-processed triggers log#notice', () => + { + let method_called = false; + + const event_id = 'document-processed'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.notice = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'delta-publish triggers log#notice', () => + { + let method_called = false; + + const event_id = 'delta-publish'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.notice = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'amqp-conn-error triggers log#warning', () => + { + let method_called = false; + + const event_id = 'amqp-conn-error'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.warning = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'amqp-reconnect triggers log#warning', () => + { + let method_called = false; + + const event_id = 'amqp-reconnect'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.warning = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'context is retrieved from error', () => + { + let method_called = false; + + const event_id = 'error'; + const err_msg = 'Foo'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + const err_context = { bar: 'baz' }; + + log.error = ( str: string, context: any ) => + { + method_called = true; + + expect( str ).to.equal( err_msg ); + expect( context ).to.equal( err_context ); + }; + + new Sut( log, emitter ); + + emitter.emit( event_id, context( new Error( err_msg ), err_context ) ); + + expect( method_called ).to.be.true; + } ); +} ); + + +function createMockLogger(): PsrLogger +{ + return { + debug( _msg: string | object, _context: object ){}, + info( _msg: string | object, _context: object ){}, + notice( _msg: string | object, _context: object ){ console.log( 'asdasd msg: ', _msg ); }, + warning( _msg: string | object, _context: object ){}, + error( _msg: string | object, _context: object ){}, + critical( _msg: string | object, _context: object ){}, + alert( _msg: string | object, _context: object ){}, + emergency( _msg: string | object, _context: object ){}, + log( _level: any, _msg: string | object, _context: object ){}, + }; +} \ No newline at end of file diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts index 07867a8..eafc77d 100644 --- a/test/system/MetricsCollectorTest.ts +++ b/test/system/MetricsCollectorTest.ts @@ -19,13 +19,15 @@ * along with this program. If not, see . */ -import { PrometheusFactory } from '../../src/system/PrometheusFactory'; +import { + PrometheusFactory, + PrometheusConfig, +} from '../../src/system/PrometheusFactory'; import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; import { EventEmitter } from 'events'; import { expect } from 'chai'; import { MetricsCollector as Sut, - PrometheusConfig, MetricTimer, } from '../../src/system/MetricsCollector'; @@ -35,8 +37,8 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => { it( 'process-complete event is hooked', () => { - let histogram_called = false; - let counter_called = false; + let histogram_called = false; + let counter_called = false; const emitter = new EventEmitter(); const conf = createMockConfig(); @@ -46,18 +48,20 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => counter_cb: () => { counter_called = true }, } ); - new Sut( factory, conf, emitter, timer ); + const sut = new Sut( factory, conf, emitter, timer ); emitter.emit( 'delta-process-end' ); expect( histogram_called ).to.be.true; expect( counter_called ).to.be.true; + + sut.stop(); } ); it( 'process-error event is hooked', () => { - let counter_called = false; + let counter_called = false; const emitter = new EventEmitter(); const conf = createMockConfig(); @@ -66,11 +70,13 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => counter_cb: () => { counter_called = true }, } ); - new Sut( factory, conf, emitter, timer ); + const sut = new Sut( factory, conf, emitter, timer ); emitter.emit( 'delta-process-error' ); expect( counter_called ).to.be.true; + + sut.stop(); } ); @@ -80,7 +86,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => const uid = 'foo'; const start_time_ns = 1234; const end_time_ns = 5678; - const expected_ms = ( end_time_ns - start_time_ns ) / 1000; + const expected_ms = ( end_time_ns - start_time_ns ) / 1000000; const emitter = new EventEmitter(); const conf = createMockConfig(); const timer = createMockTimer( start_time_ns, end_time_ns ); @@ -88,12 +94,14 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => histogram_cb: ( n: number ) => { actual_ms = n }, } ); - new Sut( factory, conf, emitter, timer ); + const sut = new Sut( factory, conf, emitter, timer ); emitter.emit( 'delta-process-start', uid ); emitter.emit( 'delta-process-end', uid ); expect( actual_ms ).to.be.equal( expected_ms ); + + sut.stop(); } ); } ); diff --git a/test/system/StandardLoggerTest.ts b/test/system/StandardLoggerTest.ts new file mode 100644 index 0000000..918bfd1 --- /dev/null +++ b/test/system/StandardLoggerTest.ts @@ -0,0 +1,178 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { StandardLogger as Sut } from '../../src/system/StandardLogger'; +import { LogLevel } from '../../src/system/PsrLogger'; +import { expect } from 'chai'; + +const sinon = require( 'sinon' ); + +declare interface MockConsole extends Console { + getLevel(): string, + getStr(): string, +} + +describe( 'system.EventLogger captures and logs events', () => +{ + it( 'debug triggers console output level: info', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.debug( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'info' ); + } ); + + it( 'info triggers console output level: info', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.info( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'info' ); + } ); + + it( 'notice triggers console output level: log', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.notice( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'log' ); + } ); + + it( 'warning triggers console output level: warn', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.warning( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'warn' ); + } ); + + it( 'error triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.error( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'critical triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.critical( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'alert triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.alert( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'emergency triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.emergency( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'log triggers corresponding log level', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.log( LogLevel.ERROR, 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'Context is included in structured output', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + const context = { bar: 'baz' }; + const expected_output = { + message: 'Foo', + timestamp: 123123, + service: 'quote-server', + env: 'test', + severity: 'NOTICE', + context: { + bar: 'baz', + }, + }; + + sut.notice( 'Foo', context ); + + expect( con.getStr() ).to.deep.equal( expected_output ); + } ); +} ); + + +function ts_ctr(): UnixTimestamp +{ + return 123123; +} + + +function createMockConsole(): MockConsole +{ + const mock = sinon.mock( console ); + + mock.lvl = ''; + mock.str = ''; + mock.info = ( str: string ) => { mock.str = str; mock.lvl = 'info'; }; + mock.log = ( str: string ) => { mock.str = str; mock.lvl = 'log'; }; + mock.warn = ( str: string ) => { mock.str = str; mock.lvl = 'warn'; }; + mock.error = ( str: string ) => { mock.str = str; mock.lvl = 'error'; }; + mock.getLevel = () => mock.lvl; + mock.getStr = () => mock.str; + + return mock; +}