1
0
Fork 0

[DEV-5312] Generalize event subscribers and dispatchers

master
Austin Schaffer 2019-11-25 12:42:03 -05:00
parent e781a841b1
commit 5ee9a5d340
11 changed files with 74 additions and 197 deletions

View File

@ -27,8 +27,6 @@ import { DeltaPublisher } from "../src/system/DeltaPublisher";
import { MongoDb, MongoDbConfig } from "../src/types/mongodb";
import { DeltaLogger } from "../src/system/DeltaLogger";
import { EventEmitter } from "events";
import { EventDispatcher } from "../src/system/event/EventDispatcher";
import { EventSubscriber } from "../src/system/event/EventSubscriber";
import {
MetricsCollector,
PrometheusConfig,
@ -63,18 +61,16 @@ const env = process.env.NODE_ENV || 'Unknown Environment';
// Event handling
const emitter = new EventEmitter();
const dispatcher = new EventDispatcher( emitter );
const subscriber = new EventSubscriber( emitter );
// Event subscribers
new DeltaLogger( env, subscriber, ts_ctr );
const metrics = new MetricsCollector( prom_conf, subscriber );
new DeltaLogger( env, emitter, ts_ctr );
const metrics = new MetricsCollector( prom_conf, emitter );
// Instantiate classes for processor
const db = _createDB( db_conf );
const dao = new MongoDeltaDao( db );
const publisher = new DeltaPublisher( amqp_conf, dispatcher, ts_ctr );
const processor = new DeltaProcessor( dao, publisher, dispatcher );
const publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr );
const processor = new DeltaProcessor( dao, publisher, emitter );
// If the dao intializes successfully then process on a two second interval
const interval_ms = 2000;

View File

@ -21,7 +21,7 @@
* Logger for delta events
*/
import { EventSubscriber } from "./event/EventSubscriber";
import { EventEmitter } from "events";
enum LogLevel {
DEBUG,
@ -47,13 +47,13 @@ export class DeltaLogger
/**
* Initialize delta logger
*
* @param _env - The environment ( dev, test, demo, live )
* @param _subscriber - An event subscriber
* @param _ts_ctr - a timestamp constructor
* @param _env - The environment ( dev, test, demo, live )
* @param _emitter - An event emitter
* @param _ts_ctr - a timestamp constructor
*/
constructor(
private readonly _env: string,
private readonly _subscriber: EventSubscriber,
private readonly _emitter: EventEmitter,
private readonly _ts_ctr : () => UnixTimestamp,
) {
this.init();
@ -86,7 +86,7 @@ export class DeltaLogger
{
const logF = this._getLogLevelFunction( level )
this._subscriber.subscribe( event_id, logF );
this._emitter.on( event_id, logF );
}

View File

@ -24,7 +24,7 @@ import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document";
import { AmqpPublisher } from "./AmqpPublisher";
import { EventDispatcher } from "./event/EventDispatcher";
import { EventEmitter } from "events";
/**
* Process deltas for a quote and publish to a queue
@ -41,14 +41,14 @@ export class DeltaProcessor
/**
* Initialize processor
*
* @param _dao - Mongo collection
* @param _publisher - Amqp Publisher
* @param _dispatcher - Event dispatcher instance
* @param _dao - Mongo collection
* @param _publisher - Amqp Publisher
* @param _emitter - Event emiter instance
*/
constructor(
private readonly _dao: DeltaDao,
private readonly _publisher: AmqpPublisher,
private readonly _dispatcher: EventDispatcher
private readonly _dao: DeltaDao,
private readonly _publisher: AmqpPublisher,
private readonly _emitter: EventEmitter
) {}
@ -90,7 +90,7 @@ export class DeltaProcessor
// this document if there was an error
if ( error )
{
self._dispatcher.dispatch(
self._emitter.emit(
'delta-process-error',
error
);
@ -101,7 +101,7 @@ export class DeltaProcessor
{
const elapsedTime = process.hrtime( startTime );
self._dispatcher.dispatch(
self._emitter.emit(
'delta-process-complete',
elapsedTime[ 1 ] / 10000
);
@ -111,7 +111,7 @@ export class DeltaProcessor
self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
.then( _ =>
{
self._dispatcher.dispatch(
self._emitter.emit(
'document-processed',
'Deltas on document ' + doc_id + ' processed '
+ 'successfully. Document has been marked as '
@ -120,13 +120,13 @@ export class DeltaProcessor
} )
.catch( err =>
{
self._dispatcher.dispatch( 'mongodb-err', err );
self._emitter.emit( 'mongodb-err', err );
} );
} );
} )
.catch( err =>
{
self._dispatcher.dispatch( 'mongodb-err', err );
self._emitter.emit( 'mongodb-err', err );
} );
}

View File

@ -23,7 +23,7 @@
import { AmqpPublisher, AmqpConfig } from './AmqpPublisher';
import { DeltaResult } from '../bucket/delta';
import { EventDispatcher } from './event/EventDispatcher';
import { EventEmitter } from "events";
import {
connect as amqpConnect,
Channel,
@ -68,9 +68,9 @@ export class DeltaPublisher implements AmqpPublisher
* @param _ts_ctr - a timestamp constructor
*/
constructor(
private readonly _conf: AmqpConfig,
private readonly _dispatcher: EventDispatcher,
private readonly _ts_ctr : () => UnixTimestamp,
private readonly _conf: AmqpConfig,
private readonly _emitter: EventEmitter,
private readonly _ts_ctr: () => UnixTimestamp,
) {
this._type = avro.parse( this.SCHEMA_PATH );
}
@ -91,7 +91,7 @@ export class DeltaPublisher implements AmqpPublisher
// If there is an error, attemp to reconnect
this._conn.on( 'error', e =>
{
this._dispatcher.dispatch( 'amqp-conn-error', e );
this._emitter.emit( 'amqp-conn-error', e );
let reconnect_interval: NodeJS.Timer;
@ -103,7 +103,7 @@ export class DeltaPublisher implements AmqpPublisher
{
clearInterval( reconnect_interval );
this._dispatcher.dispatch(
this._emitter.emit(
'amqp-reconnect-fail',
'Could not re-establish AMQP connection.'
);
@ -111,7 +111,7 @@ export class DeltaPublisher implements AmqpPublisher
return;
}
this._dispatcher.dispatch(
this._emitter.emit(
'amqp-reconnect',
'...attempting to re-establish AMQP connection'
);
@ -121,14 +121,14 @@ export class DeltaPublisher implements AmqpPublisher
{
clearInterval( reconnect_interval );
this._dispatcher.dispatch(
this._emitter.emit(
'amqp-reconnect',
'AMQP re-connected'
);
} )
.catch( e =>
{
this._dispatcher.dispatch( 'amqp-conn-error', e );
this._emitter.emit( 'amqp-conn-error', e );
} );
}
@ -188,7 +188,7 @@ export class DeltaPublisher implements AmqpPublisher
this.sendMessage( delta )
.then( _ =>
{
this._dispatcher.dispatch(
this._emitter.emit(
'delta-publish',
"Published " + delta.type + " delta with ts '"
+ delta.timestamp + "' to '" + this._conf.exchange
@ -200,7 +200,7 @@ export class DeltaPublisher implements AmqpPublisher
} )
.catch( e =>
{
this._dispatcher.dispatch(
this._emitter.emit(
'publish-err',
"Error publishing " + delta.type + " delta with ts '"
+ delta.timestamp + '" to "' + this._conf.exchange
@ -310,7 +310,7 @@ export class DeltaPublisher implements AmqpPublisher
{
if ( !this._type )
{
this._dispatcher.dispatch(
this._emitter.emit(
'avro-err',
'No avro scheama found',
);
@ -322,7 +322,7 @@ export class DeltaPublisher implements AmqpPublisher
}
catch( e )
{
this._dispatcher.dispatch(
this._emitter.emit(
'avro-err',
'Error encoding data to avro: ' + e,
);

View File

@ -21,22 +21,14 @@
* Collect Metrics for Prometheus
*/
import { EventSubscriber } from "./event/EventSubscriber";
import { DeltaDao } from "./db/DeltaDao";
import { PositiveInteger } from "../numeric";
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
import { EventEmitter } from "events";
const client = require( 'prom-client' );
// declare type MetricStructure = {
// path: string;
// code: number;
// service: string;
// env: string;
// }
export declare type PrometheusConfig = {
/** The hostname to connect to */
hostname: string;
@ -85,19 +77,17 @@ export class MetricsCollector
* Initialize delta logger
*
* @param _conf - the prometheus configuration
* @param _subscriber - the event subscriber
* @param _emitter - the event emitr
*/
constructor(
private readonly _conf: PrometheusConfig,
private readonly _subscriber: EventSubscriber,
private readonly _emitter: EventEmitter,
) {
// Set labels
const default_labels = {
client.register.setDefaultLabels( {
env: this._conf.env,
service: 'delta_processor',
};
client.register.setDefaultLabels( default_labels );
} );
// Create gateway
const url = 'http://' + this._conf.hostname + ':' + this._conf.port;
@ -140,25 +130,25 @@ export class MetricsCollector
);
// Subsribe metrics to events
this.subscribeMetrics();
this.emitMetrics();
}
/**
* Subscribe metrics
* emit metrics
*/
private subscribeMetrics()
private emitMetrics()
{
this._subscriber.subscribe(
this._emitter.on(
'delta-process-complete',
( val ) =>
( val: any ) =>
{
this._process_time_hist.observe( val );
this._process_delta_count.inc();
}
);
this._subscriber.subscribe(
this._emitter.on(
'delta-process-error',
( _ ) => this._process_error_count.inc()
);
@ -178,29 +168,11 @@ export class MetricsCollector
_body?: any
): void
{
// console.log( 'Push callback' );
// console.error( error, response, body );
console.log( 'Push callback' );
console.error( _error );
}
/**
* Get structured metric object
*
* @param path - the endpoint being hit
* @param code - the response code
*
* @returns a structured logging object
*/
// private _formatMetricVal( label: string, val: any ): MetricStructure
// {
// return <MetricStructure>{
// path: path,
// code: code,
// service: 'quote-server',
// env: this._conf.env,
// };
// }
/**
* Look for mongodb delta errors and update metrics if found

View File

@ -52,7 +52,7 @@ export interface DeltaDao
advanceDeltaIndex(
doc_id: DocumentId,
type: string,
): Promise<NullableError>
): Promise<null>
/**
@ -67,7 +67,7 @@ export interface DeltaDao
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
): Promise<NullableError>
): Promise<null>
/**
@ -77,7 +77,7 @@ export interface DeltaDao
*
* @return any errors that occurred
*/
setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
setErrorFlag( doc_id: DocumentId ): Promise<null>
/**
@ -85,6 +85,6 @@ export interface DeltaDao
*
* @return a count of the documents in an error state
*/
getErrorCount(): Promise<number | Error>
getErrorCount(): Promise<number>
}

View File

@ -61,7 +61,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return any errors that occurred
*/
init(): Promise<NullableError>
init(): Promise<null>
{
var dao = this;
@ -177,7 +177,7 @@ export class MongoDeltaDao implements DeltaDao
advanceDeltaIndex(
doc_id: DocumentId,
type: MongoDeltaType,
): Promise<NullableError>
): Promise<null>
{
return new Promise( ( resolve, reject ) =>
{
@ -217,7 +217,7 @@ export class MongoDeltaDao implements DeltaDao
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
): Promise<NullableError>
): Promise<null>
{
return new Promise( ( resolve, reject ) =>
{
@ -248,7 +248,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return any errors that occurred
*/
setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
setErrorFlag( doc_id: DocumentId ): Promise<null>
{
return new Promise( ( resolve, reject ) =>
{
@ -277,7 +277,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return a count of the documents in an error state
*/
getErrorCount(): Promise<number | Error>
getErrorCount(): Promise<number>
{
return new Promise( ( resolve, reject ) =>
{

View File

@ -1,44 +0,0 @@
/**
* Event Dispatcher
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Dispatch events
*/
import { EventEmitter } from "events";
export class EventDispatcher extends EventEmitter
{
/**
* Initialize dispatcher
*
* @param _emitter - the event emitter
*/
constructor(
private readonly _emitter: EventEmitter
) {
super();
}
dispatch( event_id: string, arg: any ): void
{
this._emitter.emit( event_id, arg );
}
}

View File

@ -1,44 +0,0 @@
/**
* Event Subscriber
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Subscribe to events
*/
import { EventEmitter } from "events";
export class EventSubscriber extends EventEmitter
{
/**
* Initialize subscriber
*
* @param _emitter - the event emitter
*/
constructor(
private readonly _emitter: EventEmitter
) {
super();
}
subscribe( event_id: string, callback: ( arg: any ) => void ): void
{
this._emitter.on( event_id, callback );
}
}

View File

@ -23,11 +23,9 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao';
import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao';
import { EventDispatcher } from '../../src/system/event/EventDispatcher';
import { EventEmitter } from 'events';
import { expect, use as chai_use } from 'chai';
import { EventEmitter } from 'events';
chai_use( require( 'chai-as-promised' ) );
@ -169,7 +167,7 @@ describe( 'system.DeltaProcessor', () =>
const sut = new Sut(
createMockDeltaDao(),
createMockDeltaPublisher(),
new EventDispatcher( new EventEmitter() ),
new EventEmitter(),
);
const actual = sut.getTimestampSortedDeltas( given );
@ -291,7 +289,7 @@ describe( 'system.DeltaProcessor', () =>
const sut = new Sut(
createMockDeltaDao(),
createMockDeltaPublisher(),
new EventDispatcher( new EventEmitter() ),
new EventEmitter(),
);
const actual = sut.getDeltas( given, type );

View File

@ -19,12 +19,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { EventDispatcher } from '../../src/system/event/EventDispatcher';
import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
import { AmqpConfig } from '../../src/system/AmqpPublisher';
import { EventEmitter } from "events";
import { expect, use as chai_use } from 'chai';
import { EventEmitter } from "events";
chai_use( require( 'chai-as-promised' ) );
@ -34,10 +33,10 @@ describe( 'server.DeltaPublisher', () =>
{
it( 'sends a message', () =>
{
const conf = createMockConf();
const dispatcher = new EventDispatcher( new EventEmitter() );
const conf = createMockConf();
const emitter = new EventEmitter();
console.log( new Sut( conf, dispatcher, ts_ctr ) );
console.log( new Sut( conf, emitter, ts_ctr ) );
expect( true ).to.be.true
} );
} );
@ -46,10 +45,10 @@ describe( 'server.DeltaPublisher', () =>
{
it( 'sends a message', () =>
{
const conf = createMockConf();
const dispatcher = new EventDispatcher( new EventEmitter() );
const conf = createMockConf();
const emitter = new EventEmitter();
console.log( new Sut( conf, dispatcher, ts_ctr ) );
console.log( new Sut( conf, emitter, ts_ctr ) );
expect( true ).to.be.true
} );
} );
@ -140,8 +139,8 @@ describe( 'server.DeltaPublisher', () =>
{
let errorCalled = false;
const dispatcher = <EventDispatcher>{
dispatch( _event_id, _err )
const emitter = <EventEmitter>{
emit( _event_id, _err )
{
errorCalled = true;
@ -151,7 +150,7 @@ describe( 'server.DeltaPublisher', () =>
const conf = createMockConf();
const data = createMockData( delta_data );
const sut = new Sut( conf, dispatcher, ts_ctr );
const sut = new Sut( conf, emitter, ts_ctr );
const buffer = sut.avroEncode( data );
if ( valid )
@ -302,10 +301,10 @@ describe( 'server.DeltaPublisher', () =>
{
it( label, () =>
{
const dispatcher = <EventDispatcher>{}
const conf = createMockConf();
const sut = new Sut( conf, dispatcher, ts_ctr );
const actual = sut.avroFormat( delta_data );
const emitter = <EventEmitter>{}
const conf = createMockConf();
const sut = new Sut( conf, emitter, ts_ctr );
const actual = sut.avroFormat( delta_data );
expect( actual ).to.deep.equal( expected );
} );