diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 1dde6df..92dd7a3 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -41,15 +41,19 @@ const { } = require( 'mongodb' ); // TODO: fix this -process.env.NODE_ENV = 'dev'; -process.env.amqp_hostname = 'localhost'; -process.env.amqp_port = '5672'; -process.env.amqp_username = 'quote_referral'; -process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E'; -process.env.amqp_vhost = 'quote'; -process.env.amqp_exchange = 'quoteupdate'; -process.env.prom_hostname = 'dmz2docker01.rsgcorp.local'; -process.env.prom_port = '9091'; +process.env.NODE_ENV = 'dev'; +process.env.amqp_hostname = 'localhost'; +process.env.amqp_port = '5672'; +process.env.amqp_username = 'quote_referral'; +process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E'; +process.env.amqp_frameMax = '0'; +process.env.amqp_heartbeat = '2'; +process.env.amqp_vhost = 'quote'; +process.env.amqp_exchange = 'quoteupdate'; +process.env.amqp_retries = '30'; +process.env.amqp_retry_wait = '1'; +process.env.prom_hostname = 'dmz2docker01.rsgcorp.local'; +process.env.prom_port = '9091'; // Environment variables const amqp_conf = _getAmqpConfig( process.env ); @@ -58,19 +62,19 @@ const prom_conf = _getPrometheusConfig( process.env ); const env = process.env.NODE_ENV || 'Unknown Environment'; // Event handling -const event_emitter = new EventEmitter(); -const event_dispatcher = new EventDispatcher( event_emitter ); -const event_subscriber = new EventSubscriber( event_emitter ); +const emitter = new EventEmitter(); +const dispatcher = new EventDispatcher( emitter ); +const subscriber = new EventSubscriber( emitter ); // Event subscribers -new DeltaLogger( env, event_subscriber, ts_ctr ); -const metrics = new MetricsCollector( prom_conf, event_subscriber ); +new DeltaLogger( env, subscriber, ts_ctr ); +const metrics = new MetricsCollector( prom_conf, subscriber ); // Instantiate classes for processor const db = _createDB( db_conf ); const dao = new MongoDeltaDao( db ); -const publisher = new DeltaPublisher( amqp_conf, event_dispatcher, ts_ctr ); -const processor = new DeltaProcessor( dao, publisher, event_dispatcher ); +const publisher = new DeltaPublisher( amqp_conf, dispatcher, ts_ctr ); +const processor = new DeltaProcessor( dao, publisher, dispatcher ); // If the dao intializes successfully then process on a two second interval const interval_ms = 2000; @@ -80,7 +84,11 @@ let process_interval: NodeJS.Timer; dao.init() .then( _ => { - publisher.connect(); + publisher.connect() + .catch( e => + { + console.error( 'AMQP connection error: ' + e ); + } ); } ) .then( _ => { @@ -98,7 +106,7 @@ dao.init() interval_ms, ); } ) -.catch( err => { console.error( 'Mongo Error: ' + err ); } ); +.catch( err => { console.error( 'Error: ' + err ); } ); /** @@ -253,16 +261,18 @@ function _getMongoConfig( env: any ): MongoDbConfig function _getAmqpConfig( env: any ): AmqpConfig { return { - "protocol": "amqp", - "hostname": env.amqp_hostname, - "port": +( env.amqp_port || 0 ), - "username": env.amqp_username, - "password": env.amqp_password, - "locale": "en_US", - "frameMax": 0, - "heartbeat": 0, - "vhost": env.amqp_vhost, - "exchange": env.amqp_exchange, + "protocol": "amqp", + "hostname": env.amqp_hostname, + "port": +( env.amqp_port || 0 ), + "username": env.amqp_username, + "password": env.amqp_password, + "locale": "en_US", + "frameMax": env.amqp_frameMax, + "heartbeat": env.amqp_heartbeat, + "vhost": env.amqp_vhost, + "exchange": env.amqp_exchange, + "retries": env.amqp_retries || 30, + "retry_wait": env.amqp_retry_wait || 1, }; } diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index 622d659..26b4949 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -26,8 +26,41 @@ import { Options } from 'amqplib'; export interface AmqpConfig extends Options.Connect { + /** The protocol to connect with (should always be "amqp") */ + protocol: string; + + /** The hostname to connect to */ + hostname: string; + + /** The port to connect to */ + port: number; + + /** A username if one if required */ + username?: string; + + /** A password if one if required */ + password?: string; + + /** Locale (should always be "en_US") */ + locale: string; + + /** The size in bytes of the maximum frame allowed */ + frameMax: number; + + /** How often to check for a live connection */ + heartBeat: number; + + /** The virtual host we are on (e.g. live, demo, test) */ + vhost?: string; + /** The name of a queue or exchange to publish to */ exchange: string; + + /** The number of times to retry connecting */ + retries: number; + + /** The time to wait in between retries */ + retry_wait: number; } diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts index f286387..3d06157 100644 --- a/src/system/DeltaLogger.ts +++ b/src/system/DeltaLogger.ts @@ -65,11 +65,14 @@ export class DeltaLogger */ init(): void { - this._registerEvent( 'document-processed', LogLevel.NOTICE ); - this._registerEvent( 'delta-publish', LogLevel.NOTICE ); - this._registerEvent( 'avro-err', LogLevel.ERROR ); - this._registerEvent( 'mongodb-err', LogLevel.ERROR ); - this._registerEvent( 'publish-err', LogLevel.ERROR ); + this._registerEvent( 'document-processed', LogLevel.NOTICE ); + this._registerEvent( 'delta-publish', LogLevel.NOTICE ); + this._registerEvent( 'amqp-conn-error', LogLevel.WARNING ); + this._registerEvent( 'amqp-reconnect', LogLevel.WARNING ); + this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR ); + this._registerEvent( 'avro-err', LogLevel.ERROR ); + this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + this._registerEvent( 'publish-err', LogLevel.ERROR ); } diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index c718756..1fe9e3c 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -21,23 +21,17 @@ * Publish delta message to a queue */ -import { AmqpPublisher } from './AmqpPublisher'; +import { AmqpPublisher, AmqpConfig } from './AmqpPublisher'; import { DeltaResult } from '../bucket/delta'; import { EventDispatcher } from './event/EventDispatcher'; import { connect as amqpConnect, - Options, Channel, Connection, } from 'amqplib'; const avro = require( 'avro-js' ); -export interface AmqpConfig extends Options.Connect { - /** The name of a queue or exchange to publish to */ - exchange: string; -} - export interface AvroSchema { /** Write data to a buffer */ @@ -94,6 +88,56 @@ export class DeltaPublisher implements AmqpPublisher { this._conn = conn; + // If there is an error, attemp to reconnect + this._conn.on( 'error', e => + { + this._dispatcher.dispatch( 'amqp-conn-error', e ); + + let reconnect_interval: NodeJS.Timer; + + let retry_count = 0; + + const reconnect = () => + { + if ( ++retry_count >= this._conf.retries ) + { + clearInterval( reconnect_interval ); + + this._dispatcher.dispatch( + 'amqp-reconnect-fail', + 'Could not re-establish AMQP connection.' + ); + + return; + } + + this._dispatcher.dispatch( + 'amqp-reconnect', + '...attempting to re-establish AMQP connection' + ); + + this.connect() + .then( _ => + { + clearInterval( reconnect_interval ); + + this._dispatcher.dispatch( + 'amqp-reconnect', + 'AMQP re-connected' + ); + } ) + .catch( e => + { + this._dispatcher.dispatch( 'amqp-conn-error', e ); + } ); + } + + reconnect_interval = setInterval( + reconnect, + ( this._conf.retry_wait * 1000 ) + ); + } ); + return this._conn.createChannel(); } ) .then( ( ch: Channel ) => @@ -141,8 +185,6 @@ export class DeltaPublisher implements AmqpPublisher { return new Promise( ( resolve, reject ) => { - const startTime = process.hrtime(); - this.sendMessage( delta ) .then( _ => { @@ -153,8 +195,6 @@ export class DeltaPublisher implements AmqpPublisher + '" exchange', ); - console.log('#publish: ' - + process.hrtime( startTime )[0] / 10000 ); resolve(); return; } ) @@ -184,13 +224,9 @@ export class DeltaPublisher implements AmqpPublisher { return new Promise( ( resolve, reject ) => { - const startTime = process.hrtime(); - const ts = this._ts_ctr(); const headers = { version: 1, created: ts }; const delta_data = this.avroFormat( delta.data ); - console.log('#sendmessage 1: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); const event_id = this.DELTA_MAP[ delta.type ]; const avro_buffer = this.avroEncode( { event: { @@ -223,8 +259,6 @@ export class DeltaPublisher implements AmqpPublisher }, }, } ); - console.log('#sendmessage 2: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); if ( !this._conn ) { @@ -241,8 +275,6 @@ export class DeltaPublisher implements AmqpPublisher reject( 'Error sending message: No avro buffer' ); return; } - console.log('#sendmessage 3: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); // we don't use a routing key; fanout exchange const published_successfully = this._channel.publish( @@ -254,8 +286,6 @@ export class DeltaPublisher implements AmqpPublisher if ( published_successfully ) { - console.log('#sendmessage 4: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); resolve(); return; } diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index 2d7ca24..4f38c46 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -130,7 +130,14 @@ export class MetricsCollector } ); // Push metrics on a specific intervals - setInterval( () => { this.pushMetrics(); }, this._push_interval_ms ); + setInterval( + () => + { + this._gateway.pushAdd( + { jobName: 'liza_delta_metrics' }, this.pushCallback + ); + }, this._push_interval_ms + ); // Subsribe metrics to events this.subscribeMetrics(); @@ -146,7 +153,6 @@ export class MetricsCollector 'delta-process-complete', ( val ) => { - console.log( 'Got time: ' + val + 'ms' ); this._process_time_hist.observe( val ); this._process_delta_count.inc(); } @@ -159,25 +165,6 @@ export class MetricsCollector } - /** - * Push metrics to Prometheus PushGateway - */ - private pushMetrics(): void - { - // this._gateway.pushAdd( this._process_time_params, this.pushCallback ); - // this._gateway.pushAdd( this._process_error_params, this.pushCallback ); - // this._gateway.pushAdd( this._current_error_params, this.pushCallback ); - // this._gateway.pushAdd( this._process_delta_params, this.pushCallback ); - - this._gateway.pushAdd( - { - jobName: 'liza_delta_metrics' - }, - this.pushCallback - ); - } - - /** * Handle push error * diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index fecef2d..da9a553 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -20,10 +20,8 @@ */ import { EventDispatcher } from '../../src/system/event/EventDispatcher'; -import { - DeltaPublisher as Sut, - AmqpConfig -} from "../../src/system/DeltaPublisher"; +import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; +import { AmqpConfig } from '../../src/system/AmqpPublisher'; import { expect, use as chai_use } from 'chai'; import { EventEmitter } from "events";