1
0
Fork 0

[DEV-5312] Reconnect AMQP when connection drops

master
Austin Schaffer 2019-11-25 12:26:39 -05:00
parent faa7e15760
commit e781a841b1
6 changed files with 140 additions and 79 deletions

View File

@ -41,15 +41,19 @@ const {
} = require( 'mongodb' );
// TODO: fix this
process.env.NODE_ENV = 'dev';
process.env.amqp_hostname = 'localhost';
process.env.amqp_port = '5672';
process.env.amqp_username = 'quote_referral';
process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
process.env.amqp_vhost = 'quote';
process.env.amqp_exchange = 'quoteupdate';
process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
process.env.prom_port = '9091';
process.env.NODE_ENV = 'dev';
process.env.amqp_hostname = 'localhost';
process.env.amqp_port = '5672';
process.env.amqp_username = 'quote_referral';
process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
process.env.amqp_frameMax = '0';
process.env.amqp_heartbeat = '2';
process.env.amqp_vhost = 'quote';
process.env.amqp_exchange = 'quoteupdate';
process.env.amqp_retries = '30';
process.env.amqp_retry_wait = '1';
process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
process.env.prom_port = '9091';
// Environment variables
const amqp_conf = _getAmqpConfig( process.env );
@ -58,19 +62,19 @@ const prom_conf = _getPrometheusConfig( process.env );
const env = process.env.NODE_ENV || 'Unknown Environment';
// Event handling
const event_emitter = new EventEmitter();
const event_dispatcher = new EventDispatcher( event_emitter );
const event_subscriber = new EventSubscriber( event_emitter );
const emitter = new EventEmitter();
const dispatcher = new EventDispatcher( emitter );
const subscriber = new EventSubscriber( emitter );
// Event subscribers
new DeltaLogger( env, event_subscriber, ts_ctr );
const metrics = new MetricsCollector( prom_conf, event_subscriber );
new DeltaLogger( env, subscriber, ts_ctr );
const metrics = new MetricsCollector( prom_conf, subscriber );
// Instantiate classes for processor
const db = _createDB( db_conf );
const dao = new MongoDeltaDao( db );
const publisher = new DeltaPublisher( amqp_conf, event_dispatcher, ts_ctr );
const processor = new DeltaProcessor( dao, publisher, event_dispatcher );
const publisher = new DeltaPublisher( amqp_conf, dispatcher, ts_ctr );
const processor = new DeltaProcessor( dao, publisher, dispatcher );
// If the dao intializes successfully then process on a two second interval
const interval_ms = 2000;
@ -80,7 +84,11 @@ let process_interval: NodeJS.Timer;
dao.init()
.then( _ =>
{
publisher.connect();
publisher.connect()
.catch( e =>
{
console.error( 'AMQP connection error: ' + e );
} );
} )
.then( _ =>
{
@ -98,7 +106,7 @@ dao.init()
interval_ms,
);
} )
.catch( err => { console.error( 'Mongo Error: ' + err ); } );
.catch( err => { console.error( 'Error: ' + err ); } );
/**
@ -253,16 +261,18 @@ function _getMongoConfig( env: any ): MongoDbConfig
function _getAmqpConfig( env: any ): AmqpConfig
{
return <AmqpConfig>{
"protocol": "amqp",
"hostname": env.amqp_hostname,
"port": +( env.amqp_port || 0 ),
"username": env.amqp_username,
"password": env.amqp_password,
"locale": "en_US",
"frameMax": 0,
"heartbeat": 0,
"vhost": env.amqp_vhost,
"exchange": env.amqp_exchange,
"protocol": "amqp",
"hostname": env.amqp_hostname,
"port": +( env.amqp_port || 0 ),
"username": env.amqp_username,
"password": env.amqp_password,
"locale": "en_US",
"frameMax": env.amqp_frameMax,
"heartbeat": env.amqp_heartbeat,
"vhost": env.amqp_vhost,
"exchange": env.amqp_exchange,
"retries": env.amqp_retries || 30,
"retry_wait": env.amqp_retry_wait || 1,
};
}

View File

@ -26,8 +26,41 @@ import { Options } from 'amqplib';
export interface AmqpConfig extends Options.Connect {
/** The protocol to connect with (should always be "amqp") */
protocol: string;
/** The hostname to connect to */
hostname: string;
/** The port to connect to */
port: number;
/** A username if one if required */
username?: string;
/** A password if one if required */
password?: string;
/** Locale (should always be "en_US") */
locale: string;
/** The size in bytes of the maximum frame allowed */
frameMax: number;
/** How often to check for a live connection */
heartBeat: number;
/** The virtual host we are on (e.g. live, demo, test) */
vhost?: string;
/** The name of a queue or exchange to publish to */
exchange: string;
/** The number of times to retry connecting */
retries: number;
/** The time to wait in between retries */
retry_wait: number;
}

View File

@ -65,11 +65,14 @@ export class DeltaLogger
*/
init(): void
{
this._registerEvent( 'document-processed', LogLevel.NOTICE );
this._registerEvent( 'delta-publish', LogLevel.NOTICE );
this._registerEvent( 'avro-err', LogLevel.ERROR );
this._registerEvent( 'mongodb-err', LogLevel.ERROR );
this._registerEvent( 'publish-err', LogLevel.ERROR );
this._registerEvent( 'document-processed', LogLevel.NOTICE );
this._registerEvent( 'delta-publish', LogLevel.NOTICE );
this._registerEvent( 'amqp-conn-error', LogLevel.WARNING );
this._registerEvent( 'amqp-reconnect', LogLevel.WARNING );
this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR );
this._registerEvent( 'avro-err', LogLevel.ERROR );
this._registerEvent( 'mongodb-err', LogLevel.ERROR );
this._registerEvent( 'publish-err', LogLevel.ERROR );
}

View File

@ -21,23 +21,17 @@
* Publish delta message to a queue
*/
import { AmqpPublisher } from './AmqpPublisher';
import { AmqpPublisher, AmqpConfig } from './AmqpPublisher';
import { DeltaResult } from '../bucket/delta';
import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
Options,
Channel,
Connection,
} from 'amqplib';
const avro = require( 'avro-js' );
export interface AmqpConfig extends Options.Connect {
/** The name of a queue or exchange to publish to */
exchange: string;
}
export interface AvroSchema {
/** Write data to a buffer */
@ -94,6 +88,56 @@ export class DeltaPublisher implements AmqpPublisher
{
this._conn = conn;
// If there is an error, attemp to reconnect
this._conn.on( 'error', e =>
{
this._dispatcher.dispatch( 'amqp-conn-error', e );
let reconnect_interval: NodeJS.Timer;
let retry_count = 0;
const reconnect = () =>
{
if ( ++retry_count >= this._conf.retries )
{
clearInterval( reconnect_interval );
this._dispatcher.dispatch(
'amqp-reconnect-fail',
'Could not re-establish AMQP connection.'
);
return;
}
this._dispatcher.dispatch(
'amqp-reconnect',
'...attempting to re-establish AMQP connection'
);
this.connect()
.then( _ =>
{
clearInterval( reconnect_interval );
this._dispatcher.dispatch(
'amqp-reconnect',
'AMQP re-connected'
);
} )
.catch( e =>
{
this._dispatcher.dispatch( 'amqp-conn-error', e );
} );
}
reconnect_interval = setInterval(
reconnect,
( this._conf.retry_wait * 1000 )
);
} );
return this._conn.createChannel();
} )
.then( ( ch: Channel ) =>
@ -141,8 +185,6 @@ export class DeltaPublisher implements AmqpPublisher
{
return new Promise<NullableError>( ( resolve, reject ) =>
{
const startTime = process.hrtime();
this.sendMessage( delta )
.then( _ =>
{
@ -153,8 +195,6 @@ export class DeltaPublisher implements AmqpPublisher
+ '" exchange',
);
console.log('#publish: '
+ process.hrtime( startTime )[0] / 10000 );
resolve();
return;
} )
@ -184,13 +224,9 @@ export class DeltaPublisher implements AmqpPublisher
{
return new Promise<NullableError>( ( resolve, reject ) =>
{
const startTime = process.hrtime();
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };
const delta_data = this.avroFormat( delta.data );
console.log('#sendmessage 1: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
const event_id = this.DELTA_MAP[ delta.type ];
const avro_buffer = this.avroEncode( {
event: {
@ -223,8 +259,6 @@ export class DeltaPublisher implements AmqpPublisher
},
},
} );
console.log('#sendmessage 2: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
if ( !this._conn )
{
@ -241,8 +275,6 @@ export class DeltaPublisher implements AmqpPublisher
reject( 'Error sending message: No avro buffer' );
return;
}
console.log('#sendmessage 3: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
// we don't use a routing key; fanout exchange
const published_successfully = this._channel.publish(
@ -254,8 +286,6 @@ export class DeltaPublisher implements AmqpPublisher
if ( published_successfully )
{
console.log('#sendmessage 4: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
resolve();
return;
}

View File

@ -130,7 +130,14 @@ export class MetricsCollector
} );
// Push metrics on a specific intervals
setInterval( () => { this.pushMetrics(); }, this._push_interval_ms );
setInterval(
() =>
{
this._gateway.pushAdd(
{ jobName: 'liza_delta_metrics' }, this.pushCallback
);
}, this._push_interval_ms
);
// Subsribe metrics to events
this.subscribeMetrics();
@ -146,7 +153,6 @@ export class MetricsCollector
'delta-process-complete',
( val ) =>
{
console.log( 'Got time: ' + val + 'ms' );
this._process_time_hist.observe( val );
this._process_delta_count.inc();
}
@ -159,25 +165,6 @@ export class MetricsCollector
}
/**
* Push metrics to Prometheus PushGateway
*/
private pushMetrics(): void
{
// this._gateway.pushAdd( this._process_time_params, this.pushCallback );
// this._gateway.pushAdd( this._process_error_params, this.pushCallback );
// this._gateway.pushAdd( this._current_error_params, this.pushCallback );
// this._gateway.pushAdd( this._process_delta_params, this.pushCallback );
this._gateway.pushAdd(
{
jobName: 'liza_delta_metrics'
},
this.pushCallback
);
}
/**
* Handle push error
*

View File

@ -20,10 +20,8 @@
*/
import { EventDispatcher } from '../../src/system/event/EventDispatcher';
import {
DeltaPublisher as Sut,
AmqpConfig
} from "../../src/system/DeltaPublisher";
import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
import { AmqpConfig } from '../../src/system/AmqpPublisher';
import { expect, use as chai_use } from 'chai';
import { EventEmitter } from "events";