diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 92dd7a3..9813169 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -27,8 +27,6 @@ import { DeltaPublisher } from "../src/system/DeltaPublisher"; import { MongoDb, MongoDbConfig } from "../src/types/mongodb"; import { DeltaLogger } from "../src/system/DeltaLogger"; import { EventEmitter } from "events"; -import { EventDispatcher } from "../src/system/event/EventDispatcher"; -import { EventSubscriber } from "../src/system/event/EventSubscriber"; import { MetricsCollector, PrometheusConfig, @@ -63,18 +61,16 @@ const env = process.env.NODE_ENV || 'Unknown Environment'; // Event handling const emitter = new EventEmitter(); -const dispatcher = new EventDispatcher( emitter ); -const subscriber = new EventSubscriber( emitter ); // Event subscribers -new DeltaLogger( env, subscriber, ts_ctr ); -const metrics = new MetricsCollector( prom_conf, subscriber ); +new DeltaLogger( env, emitter, ts_ctr ); +const metrics = new MetricsCollector( prom_conf, emitter ); // Instantiate classes for processor const db = _createDB( db_conf ); const dao = new MongoDeltaDao( db ); -const publisher = new DeltaPublisher( amqp_conf, dispatcher, ts_ctr ); -const processor = new DeltaProcessor( dao, publisher, dispatcher ); +const publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); +const processor = new DeltaProcessor( dao, publisher, emitter ); // If the dao intializes successfully then process on a two second interval const interval_ms = 2000; diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts index 3d06157..53161b1 100644 --- a/src/system/DeltaLogger.ts +++ b/src/system/DeltaLogger.ts @@ -21,7 +21,7 @@ * Logger for delta events */ -import { EventSubscriber } from "./event/EventSubscriber"; +import { EventEmitter } from "events"; enum LogLevel { DEBUG, @@ -47,13 +47,13 @@ export class DeltaLogger /** * Initialize delta logger * - * @param _env - The environment ( dev, test, demo, live ) - * @param _subscriber - An event subscriber - * @param _ts_ctr - a timestamp constructor + * @param _env - The environment ( dev, test, demo, live ) + * @param _emitter - An event emitter + * @param _ts_ctr - a timestamp constructor */ constructor( private readonly _env: string, - private readonly _subscriber: EventSubscriber, + private readonly _emitter: EventEmitter, private readonly _ts_ctr : () => UnixTimestamp, ) { this.init(); @@ -86,7 +86,7 @@ export class DeltaLogger { const logF = this._getLogLevelFunction( level ) - this._subscriber.subscribe( event_id, logF ); + this._emitter.on( event_id, logF ); } diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index db5be5b..d7137bd 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -24,7 +24,7 @@ import { MongoDeltaType } from "../system/db/MongoDeltaDao"; import { DeltaResult } from "../bucket/delta"; import { DocumentId } from "../document/Document"; import { AmqpPublisher } from "./AmqpPublisher"; -import { EventDispatcher } from "./event/EventDispatcher"; +import { EventEmitter } from "events"; /** * Process deltas for a quote and publish to a queue @@ -41,14 +41,14 @@ export class DeltaProcessor /** * Initialize processor * - * @param _dao - Mongo collection - * @param _publisher - Amqp Publisher - * @param _dispatcher - Event dispatcher instance + * @param _dao - Mongo collection + * @param _publisher - Amqp Publisher + * @param _emitter - Event emiter instance */ constructor( - private readonly _dao: DeltaDao, - private readonly _publisher: AmqpPublisher, - private readonly _dispatcher: EventDispatcher + private readonly _dao: DeltaDao, + private readonly _publisher: AmqpPublisher, + private readonly _emitter: EventEmitter ) {} @@ -90,7 +90,7 @@ export class DeltaProcessor // this document if there was an error if ( error ) { - self._dispatcher.dispatch( + self._emitter.emit( 'delta-process-error', error ); @@ -101,7 +101,7 @@ export class DeltaProcessor { const elapsedTime = process.hrtime( startTime ); - self._dispatcher.dispatch( + self._emitter.emit( 'delta-process-complete', elapsedTime[ 1 ] / 10000 ); @@ -111,7 +111,7 @@ export class DeltaProcessor self._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) .then( _ => { - self._dispatcher.dispatch( + self._emitter.emit( 'document-processed', 'Deltas on document ' + doc_id + ' processed ' + 'successfully. Document has been marked as ' @@ -120,13 +120,13 @@ export class DeltaProcessor } ) .catch( err => { - self._dispatcher.dispatch( 'mongodb-err', err ); + self._emitter.emit( 'mongodb-err', err ); } ); } ); } ) .catch( err => { - self._dispatcher.dispatch( 'mongodb-err', err ); + self._emitter.emit( 'mongodb-err', err ); } ); } diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index 1fe9e3c..0a99ac9 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -23,7 +23,7 @@ import { AmqpPublisher, AmqpConfig } from './AmqpPublisher'; import { DeltaResult } from '../bucket/delta'; -import { EventDispatcher } from './event/EventDispatcher'; +import { EventEmitter } from "events"; import { connect as amqpConnect, Channel, @@ -68,9 +68,9 @@ export class DeltaPublisher implements AmqpPublisher * @param _ts_ctr - a timestamp constructor */ constructor( - private readonly _conf: AmqpConfig, - private readonly _dispatcher: EventDispatcher, - private readonly _ts_ctr : () => UnixTimestamp, + private readonly _conf: AmqpConfig, + private readonly _emitter: EventEmitter, + private readonly _ts_ctr: () => UnixTimestamp, ) { this._type = avro.parse( this.SCHEMA_PATH ); } @@ -91,7 +91,7 @@ export class DeltaPublisher implements AmqpPublisher // If there is an error, attemp to reconnect this._conn.on( 'error', e => { - this._dispatcher.dispatch( 'amqp-conn-error', e ); + this._emitter.emit( 'amqp-conn-error', e ); let reconnect_interval: NodeJS.Timer; @@ -103,7 +103,7 @@ export class DeltaPublisher implements AmqpPublisher { clearInterval( reconnect_interval ); - this._dispatcher.dispatch( + this._emitter.emit( 'amqp-reconnect-fail', 'Could not re-establish AMQP connection.' ); @@ -111,7 +111,7 @@ export class DeltaPublisher implements AmqpPublisher return; } - this._dispatcher.dispatch( + this._emitter.emit( 'amqp-reconnect', '...attempting to re-establish AMQP connection' ); @@ -121,14 +121,14 @@ export class DeltaPublisher implements AmqpPublisher { clearInterval( reconnect_interval ); - this._dispatcher.dispatch( + this._emitter.emit( 'amqp-reconnect', 'AMQP re-connected' ); } ) .catch( e => { - this._dispatcher.dispatch( 'amqp-conn-error', e ); + this._emitter.emit( 'amqp-conn-error', e ); } ); } @@ -188,7 +188,7 @@ export class DeltaPublisher implements AmqpPublisher this.sendMessage( delta ) .then( _ => { - this._dispatcher.dispatch( + this._emitter.emit( 'delta-publish', "Published " + delta.type + " delta with ts '" + delta.timestamp + "' to '" + this._conf.exchange @@ -200,7 +200,7 @@ export class DeltaPublisher implements AmqpPublisher } ) .catch( e => { - this._dispatcher.dispatch( + this._emitter.emit( 'publish-err', "Error publishing " + delta.type + " delta with ts '" + delta.timestamp + '" to "' + this._conf.exchange @@ -310,7 +310,7 @@ export class DeltaPublisher implements AmqpPublisher { if ( !this._type ) { - this._dispatcher.dispatch( + this._emitter.emit( 'avro-err', 'No avro scheama found', ); @@ -322,7 +322,7 @@ export class DeltaPublisher implements AmqpPublisher } catch( e ) { - this._dispatcher.dispatch( + this._emitter.emit( 'avro-err', 'Error encoding data to avro: ' + e, ); diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index 4f38c46..2976858 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -21,22 +21,14 @@ * Collect Metrics for Prometheus */ -import { EventSubscriber } from "./event/EventSubscriber"; import { DeltaDao } from "./db/DeltaDao"; import { PositiveInteger } from "../numeric"; import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; +import { EventEmitter } from "events"; const client = require( 'prom-client' ); -// declare type MetricStructure = { -// path: string; -// code: number; -// service: string; -// env: string; -// } - - export declare type PrometheusConfig = { /** The hostname to connect to */ hostname: string; @@ -85,19 +77,17 @@ export class MetricsCollector * Initialize delta logger * * @param _conf - the prometheus configuration - * @param _subscriber - the event subscriber + * @param _emitter - the event emitr */ constructor( private readonly _conf: PrometheusConfig, - private readonly _subscriber: EventSubscriber, + private readonly _emitter: EventEmitter, ) { // Set labels - const default_labels = { + client.register.setDefaultLabels( { env: this._conf.env, service: 'delta_processor', - }; - - client.register.setDefaultLabels( default_labels ); + } ); // Create gateway const url = 'http://' + this._conf.hostname + ':' + this._conf.port; @@ -140,25 +130,25 @@ export class MetricsCollector ); // Subsribe metrics to events - this.subscribeMetrics(); + this.emitMetrics(); } /** - * Subscribe metrics + * emit metrics */ - private subscribeMetrics() + private emitMetrics() { - this._subscriber.subscribe( + this._emitter.on( 'delta-process-complete', - ( val ) => + ( val: any ) => { this._process_time_hist.observe( val ); this._process_delta_count.inc(); } ); - this._subscriber.subscribe( + this._emitter.on( 'delta-process-error', ( _ ) => this._process_error_count.inc() ); @@ -178,29 +168,11 @@ export class MetricsCollector _body?: any ): void { - // console.log( 'Push callback' ); - // console.error( error, response, body ); + console.log( 'Push callback' ); + console.error( _error ); } - /** - * Get structured metric object - * - * @param path - the endpoint being hit - * @param code - the response code - * - * @returns a structured logging object - */ - // private _formatMetricVal( label: string, val: any ): MetricStructure - // { - // return { - // path: path, - // code: code, - // service: 'quote-server', - // env: this._conf.env, - // }; - // } - /** * Look for mongodb delta errors and update metrics if found diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index 173fcee..5ad09fc 100644 --- a/src/system/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -52,7 +52,7 @@ export interface DeltaDao advanceDeltaIndex( doc_id: DocumentId, type: string, - ): Promise + ): Promise /** @@ -67,7 +67,7 @@ export interface DeltaDao markDocumentAsProcessed( doc_id: DocumentId, last_update_ts: UnixTimestamp, - ): Promise + ): Promise /** @@ -77,7 +77,7 @@ export interface DeltaDao * * @return any errors that occurred */ - setErrorFlag( doc_id: DocumentId ): Promise + setErrorFlag( doc_id: DocumentId ): Promise /** @@ -85,6 +85,6 @@ export interface DeltaDao * * @return a count of the documents in an error state */ - getErrorCount(): Promise + getErrorCount(): Promise } diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index 81816e0..81640c7 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -61,7 +61,7 @@ export class MongoDeltaDao implements DeltaDao * * @return any errors that occurred */ - init(): Promise + init(): Promise { var dao = this; @@ -177,7 +177,7 @@ export class MongoDeltaDao implements DeltaDao advanceDeltaIndex( doc_id: DocumentId, type: MongoDeltaType, - ): Promise + ): Promise { return new Promise( ( resolve, reject ) => { @@ -217,7 +217,7 @@ export class MongoDeltaDao implements DeltaDao markDocumentAsProcessed( doc_id: DocumentId, last_update_ts: UnixTimestamp, - ): Promise + ): Promise { return new Promise( ( resolve, reject ) => { @@ -248,7 +248,7 @@ export class MongoDeltaDao implements DeltaDao * * @return any errors that occurred */ - setErrorFlag( doc_id: DocumentId ): Promise + setErrorFlag( doc_id: DocumentId ): Promise { return new Promise( ( resolve, reject ) => { @@ -277,7 +277,7 @@ export class MongoDeltaDao implements DeltaDao * * @return a count of the documents in an error state */ - getErrorCount(): Promise + getErrorCount(): Promise { return new Promise( ( resolve, reject ) => { diff --git a/src/system/event/EventDispatcher.ts b/src/system/event/EventDispatcher.ts deleted file mode 100644 index 45a15b8..0000000 --- a/src/system/event/EventDispatcher.ts +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Event Dispatcher - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of liza. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * Dispatch events - */ - -import { EventEmitter } from "events"; - -export class EventDispatcher extends EventEmitter -{ - /** - * Initialize dispatcher - * - * @param _emitter - the event emitter - */ - constructor( - private readonly _emitter: EventEmitter - ) { - super(); - } - - - dispatch( event_id: string, arg: any ): void - { - this._emitter.emit( event_id, arg ); - } -} diff --git a/src/system/event/EventSubscriber.ts b/src/system/event/EventSubscriber.ts deleted file mode 100644 index 2950460..0000000 --- a/src/system/event/EventSubscriber.ts +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Event Subscriber - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of liza. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * Subscribe to events - */ - -import { EventEmitter } from "events"; - -export class EventSubscriber extends EventEmitter -{ - /** - * Initialize subscriber - * - * @param _emitter - the event emitter - */ - constructor( - private readonly _emitter: EventEmitter - ) { - super(); - } - - - subscribe( event_id: string, callback: ( arg: any ) => void ): void - { - this._emitter.on( event_id, callback ); - } -} diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index bccd4a4..e1db8d2 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -23,11 +23,9 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao'; -import { EventDispatcher } from '../../src/system/event/EventDispatcher'; - +import { EventEmitter } from 'events'; import { expect, use as chai_use } from 'chai'; -import { EventEmitter } from 'events'; chai_use( require( 'chai-as-promised' ) ); @@ -169,7 +167,7 @@ describe( 'system.DeltaProcessor', () => const sut = new Sut( createMockDeltaDao(), createMockDeltaPublisher(), - new EventDispatcher( new EventEmitter() ), + new EventEmitter(), ); const actual = sut.getTimestampSortedDeltas( given ); @@ -291,7 +289,7 @@ describe( 'system.DeltaProcessor', () => const sut = new Sut( createMockDeltaDao(), createMockDeltaPublisher(), - new EventDispatcher( new EventEmitter() ), + new EventEmitter(), ); const actual = sut.getDeltas( given, type ); diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index da9a553..cf3872f 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -19,12 +19,11 @@ * along with this program. If not, see . */ -import { EventDispatcher } from '../../src/system/event/EventDispatcher'; import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; import { AmqpConfig } from '../../src/system/AmqpPublisher'; +import { EventEmitter } from "events"; import { expect, use as chai_use } from 'chai'; -import { EventEmitter } from "events"; chai_use( require( 'chai-as-promised' ) ); @@ -34,10 +33,10 @@ describe( 'server.DeltaPublisher', () => { it( 'sends a message', () => { - const conf = createMockConf(); - const dispatcher = new EventDispatcher( new EventEmitter() ); + const conf = createMockConf(); + const emitter = new EventEmitter(); - console.log( new Sut( conf, dispatcher, ts_ctr ) ); + console.log( new Sut( conf, emitter, ts_ctr ) ); expect( true ).to.be.true } ); } ); @@ -46,10 +45,10 @@ describe( 'server.DeltaPublisher', () => { it( 'sends a message', () => { - const conf = createMockConf(); - const dispatcher = new EventDispatcher( new EventEmitter() ); + const conf = createMockConf(); + const emitter = new EventEmitter(); - console.log( new Sut( conf, dispatcher, ts_ctr ) ); + console.log( new Sut( conf, emitter, ts_ctr ) ); expect( true ).to.be.true } ); } ); @@ -140,8 +139,8 @@ describe( 'server.DeltaPublisher', () => { let errorCalled = false; - const dispatcher = { - dispatch( _event_id, _err ) + const emitter = { + emit( _event_id, _err ) { errorCalled = true; @@ -151,7 +150,7 @@ describe( 'server.DeltaPublisher', () => const conf = createMockConf(); const data = createMockData( delta_data ); - const sut = new Sut( conf, dispatcher, ts_ctr ); + const sut = new Sut( conf, emitter, ts_ctr ); const buffer = sut.avroEncode( data ); if ( valid ) @@ -302,10 +301,10 @@ describe( 'server.DeltaPublisher', () => { it( label, () => { - const dispatcher = {} - const conf = createMockConf(); - const sut = new Sut( conf, dispatcher, ts_ctr ); - const actual = sut.avroFormat( delta_data ); + const emitter = {} + const conf = createMockConf(); + const sut = new Sut( conf, emitter, ts_ctr ); + const actual = sut.avroFormat( delta_data ); expect( actual ).to.deep.equal( expected ); } );