diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
index 836cb83..1dde6df 100644
--- a/bin/delta-processor.ts
+++ b/bin/delta-processor.ts
@@ -18,6 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
+import fs = require( 'fs' );
import { AmqpConfig } from "../src/system/AmqpPublisher";
import { MongoDeltaDao } from "../src/system/db/MongoDeltaDao";
@@ -28,27 +29,32 @@ import { DeltaLogger } from "../src/system/DeltaLogger";
import { EventEmitter } from "events";
import { EventDispatcher } from "../src/system/event/EventDispatcher";
import { EventSubscriber } from "../src/system/event/EventSubscriber";
-// import { MetricsCollector } from "../src/system/MetricsCollector";
+import {
+ MetricsCollector,
+ PrometheusConfig,
+} from "../src/system/MetricsCollector";
const {
Db: MongoDb,
Server: MongoServer,
- Connection: MongoConnection,
ReplServers: ReplSetServers,
-} = require( 'mongodb/lib/mongodb' );
+} = require( 'mongodb' );
// TODO: fix this
-process.env.hostname = 'localhost';
-process.env.port = '5672';
-process.env.username = 'quote_referral';
-process.env.password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
-process.env.vhost = 'quote';
-process.env.exchange = 'quoteupdate';
-
+process.env.NODE_ENV = 'dev';
+process.env.amqp_hostname = 'localhost';
+process.env.amqp_port = '5672';
+process.env.amqp_username = 'quote_referral';
+process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
+process.env.amqp_vhost = 'quote';
+process.env.amqp_exchange = 'quoteupdate';
+process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
+process.env.prom_port = '9091';
// Environment variables
const amqp_conf = _getAmqpConfig( process.env );
const db_conf = _getMongoConfig( process.env );
+const prom_conf = _getPrometheusConfig( process.env );
const env = process.env.NODE_ENV || 'Unknown Environment';
// Event handling
@@ -57,8 +63,8 @@ const event_dispatcher = new EventDispatcher( event_emitter );
const event_subscriber = new EventSubscriber( event_emitter );
// Event subscribers
-new DeltaLogger( env, event_subscriber, ts_ctr ).init();
-// new MetricsCollector( env, event_subscriber );
+new DeltaLogger( env, event_subscriber, ts_ctr );
+const metrics = new MetricsCollector( prom_conf, event_subscriber );
// Instantiate classes for processor
const db = _createDB( db_conf );
@@ -69,11 +75,104 @@ const processor = new DeltaProcessor( dao, publisher, event_dispatcher );
// If the dao intializes successfully then process on a two second interval
const interval_ms = 2000;
+let process_interval: NodeJS.Timer;
+
dao.init()
-.then( _ => { setInterval( () => { processor.process(); }, interval_ms ); } )
+.then( _ =>
+{
+ publisher.connect();
+} )
+.then( _ =>
+{
+ const pidPath = __dirname + '/../conf/.delta_processor.pid';
+
+ writePidFile(pidPath );
+ greet( 'Liza Delta Processor', pidPath );
+
+ process_interval = setInterval(
+ () =>
+ {
+ processor.process();
+ metrics.checkForErrors( dao );
+ },
+ interval_ms,
+ );
+} )
.catch( err => { console.error( 'Mongo Error: ' + err ); } );
+/**
+ * Output greeting
+ *
+ * The greeting contains the program name and PID file path.
+ *
+ * @param name - program name
+ * @param pid_path - path to PID file
+ */
+function greet( name: string, pid_path: string ): void
+{
+ console.log( `${name}`);
+ console.log( `PID file: ${pid_path}` );
+}
+
+
+/**
+ * Write process id (PID) file
+ *
+ * @param pid_path - path to pid file
+ */
+function writePidFile( pid_path: string ): void
+{
+ fs.writeFileSync( pid_path, process.pid );
+
+ process.on( 'SIGINT', function()
+ {
+ shutdown( 'SIGINT' );
+ } )
+ .on( 'SIGTERM', function()
+ {
+ shutdown( 'SIGTERM' );
+ } )
+ .on( 'exit', () =>
+ {
+ fs.unlink( pid_path, () => {} );
+ } );
+}
+
+
+/**
+ * Perform a graceful shutdown
+ *
+ * @param signal - the signal that caused the shutdown
+ */
+function shutdown( signal: string ): void
+{
+ console.log( "Received " + signal + ". Beginning graceful shutdown:" );
+
+ console.log( "...Stopping processing interval" );
+
+ clearInterval( process_interval );
+
+ console.log( "...Closing MongoDb connection" );
+
+ db.close( ( err, _data ) =>
+ {
+ if ( err )
+ {
+ console.error( " Error closing connection: " + err );
+ }
+ } );
+
+ console.log( "...Closing AMQP connection..." );
+
+ publisher.close();
+
+ console.log( "Shutdown complete. Exiting." );
+
+ process.exit();
+}
+
+
/** Timestamp constructor
*
* @return a timestamp
@@ -95,7 +194,7 @@ function _createDB( conf: MongoDbConfig ): MongoDb
{
if( conf.ha )
{
- var mongodbPort = conf.port || MongoConnection.DEFAULT_PORT;
+ var mongodbPort = conf.port || 27017;
var mongodbReplSet = conf.replset || 'rs0';
var dbServers = new ReplSetServers(
[
@@ -109,7 +208,7 @@ function _createDB( conf: MongoDbConfig ): MongoDb
{
var dbServers = new MongoServer(
conf.host || '127.0.0.1',
- conf.port || MongoConnection.DEFAULT_PORT,
+ conf.port || 27017,
{auto_reconnect: true}
);
}
@@ -155,14 +254,31 @@ function _getAmqpConfig( env: any ): AmqpConfig
{
return {
"protocol": "amqp",
- "hostname": env.hostname,
- "port": +( env.port || 0 ),
- "username": env.username,
- "password": env.password,
+ "hostname": env.amqp_hostname,
+ "port": +( env.amqp_port || 0 ),
+ "username": env.amqp_username,
+ "password": env.amqp_password,
"locale": "en_US",
"frameMax": 0,
"heartbeat": 0,
- "vhost": env.vhost,
- "exchange": env.exchange,
+ "vhost": env.amqp_vhost,
+ "exchange": env.amqp_exchange,
+ };
+}
+
+
+/**
+ * Create a prometheus configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the prometheus configuration
+ */
+function _getPrometheusConfig( env: any ): PrometheusConfig
+{
+ return {
+ "hostname": env.prom_hostname,
+ "port": +( env.prom_port || 0 ),
+ "env": process.env.NODE_ENV,
};
}
\ No newline at end of file
diff --git a/package.json.in b/package.json.in
index 0aea8e0..898ec54 100644
--- a/package.json.in
+++ b/package.json.in
@@ -30,7 +30,7 @@
},
"devDependencies": {
"typescript": "~3.7",
- "@types/node": "@TS_NODE_VERSION@",
+ "@types/node": "12.12.11",
"chai": ">=1.9.1 < 4",
"@types/chai": ">=1.9.1 < 4",
"chai-as-promised": "7.1.0",
@@ -40,7 +40,8 @@
"sinon": ">=1.17.4",
"es6-promise": "~3",
"@types/amqplib": "0.5.13",
- "avro-js": "1.9.1"
+ "avro-js": "1.9.1",
+ "prom-client": "11.0.0"
},
"licenses": [
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index 019ea0d..622d659 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -38,5 +38,5 @@ export interface AmqpPublisher
*
* @param delta - The delta to publish
*/
- publish( delta: DeltaResult ): Promise;
+ publish( delta: DeltaResult ): Promise;
}
diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts
index 50e616e..f286387 100644
--- a/src/system/DeltaLogger.ts
+++ b/src/system/DeltaLogger.ts
@@ -55,7 +55,9 @@ export class DeltaLogger
private readonly _env: string,
private readonly _subscriber: EventSubscriber,
private readonly _ts_ctr : () => UnixTimestamp,
- ) {}
+ ) {
+ this.init();
+ }
/**
@@ -65,7 +67,7 @@ export class DeltaLogger
{
this._registerEvent( 'document-processed', LogLevel.NOTICE );
this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
+ this._registerEvent( 'avro-err', LogLevel.ERROR );
this._registerEvent( 'mongodb-err', LogLevel.ERROR );
this._registerEvent( 'publish-err', LogLevel.ERROR );
}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 39cdf3d..db5be5b 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -59,7 +59,7 @@ export class DeltaProcessor
{
let self = this;
- this._dao.getUnprocessedDocuments()
+ self._dao.getUnprocessedDocuments()
.then( docs =>
{
docs.forEach( doc =>
@@ -68,23 +68,50 @@ export class DeltaProcessor
const doc_id: DocumentId = doc.id;
const last_updated_ts = doc.lastUpdate;
- deltas.forEach( delta =>
+ for ( let i = 0; i < deltas.length; i++ )
{
+ const delta = deltas[ i ];
+ const startTime = process.hrtime();
+ let error = null;
+
self._publisher.publish( delta )
.then( _ =>
{
self._dao.advanceDeltaIndex( doc_id, delta.type );
} )
- .catch( _ =>
+ .catch( err =>
{
- // TODO: blow up?
+ self._dao.setErrorFlag( doc_id );
+
+ error = err;
} );
- });
+
+ // Do not process any more deltas for
+ // this document if there was an error
+ if ( error )
+ {
+ self._dispatcher.dispatch(
+ 'delta-process-error',
+ error
+ );
+
+ return;
+ }
+ else
+ {
+ const elapsedTime = process.hrtime( startTime );
+
+ self._dispatcher.dispatch(
+ 'delta-process-complete',
+ elapsedTime[ 1 ] / 10000
+ );
+ }
+ };
self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
.then( _ =>
{
- this._dispatcher.dispatch(
+ self._dispatcher.dispatch(
'document-processed',
'Deltas on document ' + doc_id + ' processed '
+ 'successfully. Document has been marked as '
@@ -93,13 +120,13 @@ export class DeltaProcessor
} )
.catch( err =>
{
- this._dispatcher.dispatch( 'mongodb-err', err );
+ self._dispatcher.dispatch( 'mongodb-err', err );
} );
- });
+ } );
} )
.catch( err =>
{
- this._dispatcher.dispatch( 'mongodb-err', err );
+ self._dispatcher.dispatch( 'mongodb-err', err );
} );
}
@@ -137,14 +164,15 @@ export class DeltaProcessor
const deltas: DeltaResult[] = deltas_obj[ type ] || [];
// Get type specific delta index
- let last_published_index = 0;
- if ( doc.lastPublishDelta )
+ let published_count = 0;
+ if ( doc.totalPublishDelta )
{
- last_published_index = doc.lastPublishDelta[ type ] || 0;
+ published_count = doc.totalPublishDelta[ type ] || 0;
}
// Only return the unprocessed deltas
- const deltas_trimmed = deltas.slice( last_published_index );
+ console.log( published_count );
+ const deltas_trimmed = deltas.slice( published_count );
// Mark each delta with its type
deltas_trimmed.forEach( delta =>
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index c67cde4..c718756 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -27,7 +27,8 @@ import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
Options,
- Channel
+ Channel,
+ Connection,
} from 'amqplib';
const avro = require( 'avro-js' );
@@ -38,8 +39,23 @@ export interface AmqpConfig extends Options.Connect {
}
+export interface AvroSchema {
+ /** Write data to a buffer */
+ toBuffer( data: Record ): Buffer | null;
+}
+
+
export class DeltaPublisher implements AmqpPublisher
{
+ /** The amqp connection */
+ private _conn?: Connection;
+
+ /** The amqp channel */
+ private _channel?: Channel;
+
+ /** The avro schema */
+ private _type?: AvroSchema;
+
/** The path to the avro schema */
readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
@@ -51,7 +67,7 @@ export class DeltaPublisher implements AmqpPublisher
/**
- * Initialize publisher
+ * Delta publisher
*
* @param _conf - amqp configuration
* @param _emitter - event emitter instance
@@ -61,7 +77,57 @@ export class DeltaPublisher implements AmqpPublisher
private readonly _conf: AmqpConfig,
private readonly _dispatcher: EventDispatcher,
private readonly _ts_ctr : () => UnixTimestamp,
- ) {}
+ ) {
+ this._type = avro.parse( this.SCHEMA_PATH );
+ }
+
+
+ /**
+ * Initialize connection
+ */
+ connect(): Promise
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ amqpConnect( this._conf )
+ .then( conn =>
+ {
+ this._conn = conn;
+
+ return this._conn.createChannel();
+ } )
+ .then( ( ch: Channel ) =>
+ {
+ this._channel = ch;
+
+ this._channel.assertExchange(
+ this._conf.exchange,
+ 'fanout',
+ { durable: true }
+ );
+
+ resolve();
+ return;
+ } )
+ .catch( e =>
+ {
+ reject( e );
+ return;
+ } );
+ } );
+ }
+
+
+ /**
+ * Close the amqp conenction
+ */
+ close(): void
+ {
+ if ( this._conn )
+ {
+ this._conn.close.bind(this._conn);
+ }
+ }
/**
@@ -71,57 +137,34 @@ export class DeltaPublisher implements AmqpPublisher
*
* @return whether the message was published successfully
*/
- publish( delta: DeltaResult ): Promise
+ publish( delta: DeltaResult ): Promise
{
- const exchange = this._conf.exchange;
-
- return new Promise( ( resolve, reject ) =>
+ return new Promise( ( resolve, reject ) =>
{
- amqpConnect( this._conf )
- .then( conn =>
- {
- setTimeout( () => conn.close(), 10000 );
- return conn.createChannel();
- } )
- .then( ch =>
- {
- ch.assertExchange( exchange, 'fanout', { durable: true } );
+ const startTime = process.hrtime();
- return this.sendMessage( ch, exchange, delta );
- } )
- .then( sentSuccessfully =>
+ this.sendMessage( delta )
+ .then( _ =>
{
- console.log('sentSuccessfully', sentSuccessfully);
- if ( sentSuccessfully )
- {
- this._dispatcher.dispatch(
- 'delta-publish',
- "Published " + delta.type + " delta with ts '"
- + delta.timestamp + "' to '" + exchange
- + '" exchange',
- );
+ this._dispatcher.dispatch(
+ 'delta-publish',
+ "Published " + delta.type + " delta with ts '"
+ + delta.timestamp + "' to '" + this._conf.exchange
+ + '" exchange',
+ );
- resolve();
- }
- else
- {
- this._dispatcher.dispatch(
- 'publish-err',
- "Error publishing " + delta.type + " delta with ts '"
- + delta.timestamp + "' to '" + exchange
- + "' exchange",
- );
-
- reject();
- }
+ console.log('#publish: '
+ + process.hrtime( startTime )[0] / 10000 );
+ resolve();
+ return;
} )
.catch( e =>
{
this._dispatcher.dispatch(
'publish-err',
"Error publishing " + delta.type + " delta with ts '"
- + delta.timestamp + '" to "' + exchange + "' exchange '"
- + e,
+ + delta.timestamp + '" to "' + this._conf.exchange
+ + "' exchange: '" + e,
)
reject();
@@ -133,75 +176,92 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Send message to exchange
*
- * @param channel - AMQP channel
- * @param exchange - exchange name
- * @param delta - The delta to publish
+ * @param delta - The delta to publish
*
* @return whether publish was successful
*/
- sendMessage(
- channel: Channel,
- exchange: string,
- delta: DeltaResult,
- ): boolean
+ sendMessage( delta: DeltaResult ): Promise
{
- const headers = {
- version: 1,
- created: Date.now(),
- };
-
- // Convert all delta datums to string for avro
- const delta_data = this.avroFormat( delta.data );
- const event_id = this.DELTA_MAP[ delta.type ];
-
- const data = {
- event: {
- id: event_id,
- ts: this._ts_ctr(),
- actor: 'SERVER',
- step: null,
- },
- document: {
- id: 123123, // Fix
- },
- session: {
- entity_name: 'Foobar', // Fix
- entity_id: 123123, // Fix
- },
- data: {
- Data: {
- bucket: delta_data,
- },
- },
- delta: {
- Data: {
- bucket: delta_data,
- },
- },
- program: {
- Program: {
- id: 'quote_server',
- version: 'dadaddwafdwa', // Fix
- },
- },
- };
-
- const avro_buffer = this.avroEncode( data );
-
- if ( !avro_buffer )
+ return new Promise( ( resolve, reject ) =>
{
- return false;
- }
+ const startTime = process.hrtime();
- // we don't use a routing key; fanout exchange
- const routing_key = '';
+ const ts = this._ts_ctr();
+ const headers = { version: 1, created: ts };
+ const delta_data = this.avroFormat( delta.data );
+ console.log('#sendmessage 1: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
+ const event_id = this.DELTA_MAP[ delta.type ];
+ const avro_buffer = this.avroEncode( {
+ event: {
+ id: event_id,
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: 123123, // Fix
+ },
+ session: {
+ entity_name: 'Foobar', // Fix
+ entity_id: 123123, // Fix
+ },
+ data: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: 'dadaddwafdwa', // Fix
+ },
+ },
+ } );
+ console.log('#sendmessage 2: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
- return channel.publish(
- exchange,
- routing_key,
- avro_buffer,
- { headers: headers },
- );
+ if ( !this._conn )
+ {
+ reject( 'Error sending message: No connection' );
+ return;
+ }
+ else if ( !this._channel )
+ {
+ reject( 'Error sending message: No channel' );
+ return;
+ }
+ else if ( !avro_buffer )
+ {
+ reject( 'Error sending message: No avro buffer' );
+ return;
+ }
+ console.log('#sendmessage 3: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
+
+ // we don't use a routing key; fanout exchange
+ const published_successfully = this._channel.publish(
+ this._conf.exchange,
+ '',
+ avro_buffer,
+ { headers: headers },
+ );
+
+ if ( published_successfully )
+ {
+ console.log('#sendmessage 4: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
+ resolve();
+ return;
+ }
+
+ reject( 'Error sending message: publishing failed' );
+ } );
}
@@ -218,13 +278,22 @@ export class DeltaPublisher implements AmqpPublisher
try
{
- const type = avro.parse( this.SCHEMA_PATH );
- buffer = type.toBuffer( data );
+ if ( !this._type )
+ {
+ this._dispatcher.dispatch(
+ 'avro-err',
+ 'No avro scheama found',
+ );
+
+ return null;
+ }
+
+ buffer = this._type.toBuffer( data );
}
catch( e )
{
this._dispatcher.dispatch(
- 'avro-parse-err',
+ 'avro-err',
'Error encoding data to avro: ' + e,
);
}
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index 583b5a6..2d7ca24 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -22,41 +22,177 @@
*/
import { EventSubscriber } from "./event/EventSubscriber";
+import { DeltaDao } from "./db/DeltaDao";
+import { PositiveInteger } from "../numeric";
+import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
-const client = require('prom-client');
+const client = require( 'prom-client' );
-declare type MetricStructure = {
- path: string;
- code: number;
- service: string;
- env: string;
+
+// declare type MetricStructure = {
+// path: string;
+// code: number;
+// service: string;
+// env: string;
+// }
+
+
+export declare type PrometheusConfig = {
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** The environment ( dev, test, demo, live ) */
+ env: string;
}
+
export class MetricsCollector
{
+ /** The prometheus PushGateway */
+ private _gateway: Pushgateway;
+
+ /** Metric push interval */
+ private _push_interval_ms: PositiveInteger = 5000;
+
+ /** Delta processed time histogram */
+ private _process_time_hist: Histogram;
+ private _process_time_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_process_time'
+ };
+
+ /** Delta error counter */
+ private _process_error_count: Counter;
+ private _process_error_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_error'
+ };
+
+ /** Delta current error gauge */
+ private _current_error_gauge: Gauge;
+ private _current_error_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_current_error'
+ };
+
+ /** Delta error counter */
+ private _process_delta_count: Counter;
+ private _process_delta_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_success'
+ };
+
/**
* Initialize delta logger
+ *
+ * @param _conf - the prometheus configuration
+ * @param _subscriber - the event subscriber
*/
constructor(
- private readonly _env: string,
+ private readonly _conf: PrometheusConfig,
private readonly _subscriber: EventSubscriber,
- ) {}
+ ) {
+ // Set labels
+ const default_labels = {
+ env: this._conf.env,
+ service: 'delta_processor',
+ };
+
+ client.register.setDefaultLabels( default_labels );
+
+ // Create gateway
+ const url = 'http://' + this._conf.hostname + ':' + this._conf.port;
+ this._gateway = new client.Pushgateway( url );
+
+ // Create metrics
+ this._process_time_hist = new client.Histogram( {
+ name: this._process_time_params.jobName,
+ help: 'Time in ms for deltas to be processed',
+ labelNames: [ 'env', 'service' ],
+ buckets: client.linearBuckets(0, 10, 10),
+ } );
+
+ this._process_error_count = new client.Counter( {
+ name: this._process_error_params.jobName,
+ help: 'Error count for deltas being processed',
+ labelNames: [ 'env', 'service' ],
+ } );
+
+ this._current_error_gauge = new client.Gauge( {
+ name: this._current_error_params.jobName,
+ help: 'The current number of documents in an error state',
+ labelNames: [ 'env', 'service' ],
+ } );
+
+ this._process_delta_count = new client.Counter( {
+ name: this._process_delta_params.jobName,
+ help: 'Count of deltas successfully processed',
+ labelNames: [ 'env', 'service' ],
+ } );
+
+ // Push metrics on a specific intervals
+ setInterval( () => { this.pushMetrics(); }, this._push_interval_ms );
+
+ // Subsribe metrics to events
+ this.subscribeMetrics();
+ }
/**
- * Initialize the logger to look for specific events
+ * Subscribe metrics
*/
- init(): void
+ private subscribeMetrics()
{
- const collectDefaultMetrics = client.collectDefaultMetrics;
+ this._subscriber.subscribe(
+ 'delta-process-complete',
+ ( val ) =>
+ {
+ console.log( 'Got time: ' + val + 'ms' );
+ this._process_time_hist.observe( val );
+ this._process_delta_count.inc();
+ }
+ );
- console.log( this._subscriber, collectDefaultMetrics)
- this._formatLog( '', 123 );
- // this._registerEvent( 'document-processed', LogLevel.NOTICE );
- // this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- // this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
- // this._registerEvent( 'mongodb-err', LogLevel.ERROR );
- // this._registerEvent( 'publish-err', LogLevel.ERROR );
+ this._subscriber.subscribe(
+ 'delta-process-error',
+ ( _ ) => this._process_error_count.inc()
+ );
+ }
+
+
+ /**
+ * Push metrics to Prometheus PushGateway
+ */
+ private pushMetrics(): void
+ {
+ // this._gateway.pushAdd( this._process_time_params, this.pushCallback );
+ // this._gateway.pushAdd( this._process_error_params, this.pushCallback );
+ // this._gateway.pushAdd( this._current_error_params, this.pushCallback );
+ // this._gateway.pushAdd( this._process_delta_params, this.pushCallback );
+
+ this._gateway.pushAdd(
+ {
+ jobName: 'liza_delta_metrics'
+ },
+ this.pushCallback
+ );
+ }
+
+
+ /**
+ * Handle push error
+ *
+ * @param error - Any errors that occurred
+ * @param response - The http response
+ * @param body - The resposne body
+ */
+ private pushCallback(
+ _error?: Error | undefined,
+ _response?: any,
+ _body?: any
+ ): void
+ {
+ // console.log( 'Push callback' );
+ // console.error( error, response, body );
}
@@ -68,13 +204,35 @@ export class MetricsCollector
*
* @returns a structured logging object
*/
- private _formatLog( path: string, code: number ): MetricStructure
+ // private _formatMetricVal( label: string, val: any ): MetricStructure
+ // {
+ // return {
+ // path: path,
+ // code: code,
+ // service: 'quote-server',
+ // env: this._conf.env,
+ // };
+ // }
+
+
+ /**
+ * Look for mongodb delta errors and update metrics if found
+ *
+ * @return any errors the occurred
+ */
+ checkForErrors( dao: DeltaDao ): NullableError
{
- return {
- path: path,
- code: code,
- service: 'quote-server',
- env: this._env,
- };
+ dao.getErrorCount()
+ .then( count =>
+ {
+ // console.log( 'Error count: ', count );
+ this._current_error_gauge.set( +count );
+ } )
+ .catch( err =>
+ {
+ return err;
+ } );
+
+ return null;
}
}
diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
index 64e1f0f..173fcee 100644
--- a/src/system/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -47,7 +47,7 @@ export interface DeltaDao
* @param doc_id - Document whose index will be set
* @param type - Delta type
*
- * @return any errors that occured
+ * @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@@ -62,11 +62,29 @@ export interface DeltaDao
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
- * @return true if the document was successfully marked as processed
+ * @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
): Promise
+
+
+ /**
+ * Flag the document as being in an error state
+ *
+ * @param doc_id - The document to flag
+ *
+ * @return any errors that occurred
+ */
+ setErrorFlag( doc_id: DocumentId ): Promise
+
+
+ /**
+ * Get a count of documents in an error state
+ *
+ * @return a count of the documents in an error state
+ */
+ getErrorCount(): Promise
}
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index 443ae85..81816e0 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -59,7 +59,7 @@ export class MongoDeltaDao implements DeltaDao
*
* connectError event will be emitted on failure.
*
- * @return any errors that occured
+ * @return any errors that occurred
*/
init(): Promise
{
@@ -73,14 +73,18 @@ export class MongoDeltaDao implements DeltaDao
// if there was an error, don't bother with anything else
if ( err )
{
- // in some circumstances, it may just be telling us that we're
- // already connected (even though the connection may have been
- // broken)
+ // in some circumstances, it may just be telling us that
+ // we're already connected (even though the connection may
+ // have been broken)
if ( err.errno !== undefined )
{
reject( 'Error opening mongo connection: ' + err );
return;
}
+ } else if ( db == null )
+ {
+ reject( 'No database connection' );
+ return;
}
// quotes collection
@@ -116,7 +120,7 @@ export class MongoDeltaDao implements DeltaDao
);
}
);
- });
+ } );
} );
}
@@ -165,8 +169,10 @@ export class MongoDeltaDao implements DeltaDao
/**
* Set the document's processed index
*
- * @param doc_id - Document whose index will be set
- * @param type - Delta type
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ *
+ * @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@@ -177,7 +183,7 @@ export class MongoDeltaDao implements DeltaDao
{
const inc_data: Record = {};
- inc_data[ 'lastPublishDelta.' + type ] = 1;
+ inc_data[ 'totalPublishDelta.' + type ] = 1;
this._collection!.update(
{ id: doc_id },
@@ -206,7 +212,7 @@ export class MongoDeltaDao implements DeltaDao
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
- * @return true if the document was successfully marked as processed
+ * @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
@@ -231,7 +237,74 @@ export class MongoDeltaDao implements DeltaDao
return;
}
);
+ } );
+ }
+
+ /**
+ * Flag the document as being in an error state
+ *
+ * @param doc_id - The document to flag
+ *
+ * @return any errors that occurred
+ */
+ setErrorFlag( doc_id: DocumentId ): Promise
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection!.update(
+ { id: doc_id },
+ { $set: { deltaError: true } },
+ { upsert: false },
+ function( err )
+ {
+ if ( err )
+ {
+ reject( "Failed setting error flag: " + err );
+ return;
+ }
+
+ resolve();
+ return;
+ }
+ );
+ } );
+ }
+
+
+ /**
+ * Get a count of documents in an error state
+ *
+ * @return a count of the documents in an error state
+ */
+ getErrorCount(): Promise
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection!.find(
+ { deltaError: true },
+ {},
+ function( err, cursor )
+ {
+ if ( err )
+ {
+ reject( err );
+ return;
+ }
+
+ cursor.toArray( function( err: NullableError, data: any[] )
+ {
+ if ( err )
+ {
+ reject( err );
+ return;
+ }
+
+ // return the count
+ resolve( data.length );
+ });
+ }
+ )
} );
}
}
diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts
index 61d764e..968fa67 100644
--- a/src/types/mongodb.d.ts
+++ b/src/types/mongodb.d.ts
@@ -51,6 +51,14 @@ export interface MongoDb
* @param callback continuation on completion
*/
open( callback: MongoCallback ): void;
+
+
+ /**
+ * Close the database connection
+ *
+ * @param callback continuation on completion
+ */
+ close( callback: MongoCallback ): void;
}
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
index d32197e..bccd4a4 100644
--- a/test/system/DeltaProcessorTest.ts
+++ b/test/system/DeltaProcessorTest.ts
@@ -241,7 +241,7 @@ describe( 'system.DeltaProcessor', () =>
},
],
},
- lastPublishDelta: {
+ totalPublishDelta: {
data: 0,
},
},
@@ -274,7 +274,7 @@ describe( 'system.DeltaProcessor', () =>
},
],
},
- lastPublishDelta: {
+ totalPublishDelta: {
data: 1,
},
},
@@ -308,6 +308,8 @@ function createMockDeltaDao(): DeltaDao
getUnprocessedDocuments() { return Promise.resolve( [] ); },
advanceDeltaIndex() { return Promise.resolve( null ); },
markDocumentAsProcessed() { return Promise.resolve( null ); },
+ setErrorFlag() { return Promise.resolve( null ); },
+ getErrorCount() { return Promise.resolve( 0 ); },
};
}