1
0
Fork 0

[DEV-5312] Add signal handling and prometheus monitoring

master
Austin Schaffer 2019-11-21 15:59:17 -05:00
parent 9b5cd4e89f
commit faa7e15760
11 changed files with 660 additions and 185 deletions

View File

@ -18,6 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import fs = require( 'fs' );
import { AmqpConfig } from "../src/system/AmqpPublisher";
import { MongoDeltaDao } from "../src/system/db/MongoDeltaDao";
@ -28,27 +29,32 @@ import { DeltaLogger } from "../src/system/DeltaLogger";
import { EventEmitter } from "events";
import { EventDispatcher } from "../src/system/event/EventDispatcher";
import { EventSubscriber } from "../src/system/event/EventSubscriber";
// import { MetricsCollector } from "../src/system/MetricsCollector";
import {
MetricsCollector,
PrometheusConfig,
} from "../src/system/MetricsCollector";
const {
Db: MongoDb,
Server: MongoServer,
Connection: MongoConnection,
ReplServers: ReplSetServers,
} = require( 'mongodb/lib/mongodb' );
} = require( 'mongodb' );
// TODO: fix this
process.env.hostname = 'localhost';
process.env.port = '5672';
process.env.username = 'quote_referral';
process.env.password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
process.env.vhost = 'quote';
process.env.exchange = 'quoteupdate';
process.env.NODE_ENV = 'dev';
process.env.amqp_hostname = 'localhost';
process.env.amqp_port = '5672';
process.env.amqp_username = 'quote_referral';
process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
process.env.amqp_vhost = 'quote';
process.env.amqp_exchange = 'quoteupdate';
process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
process.env.prom_port = '9091';
// Environment variables
const amqp_conf = _getAmqpConfig( process.env );
const db_conf = _getMongoConfig( process.env );
const prom_conf = _getPrometheusConfig( process.env );
const env = process.env.NODE_ENV || 'Unknown Environment';
// Event handling
@ -57,8 +63,8 @@ const event_dispatcher = new EventDispatcher( event_emitter );
const event_subscriber = new EventSubscriber( event_emitter );
// Event subscribers
new DeltaLogger( env, event_subscriber, ts_ctr ).init();
// new MetricsCollector( env, event_subscriber );
new DeltaLogger( env, event_subscriber, ts_ctr );
const metrics = new MetricsCollector( prom_conf, event_subscriber );
// Instantiate classes for processor
const db = _createDB( db_conf );
@ -69,11 +75,104 @@ const processor = new DeltaProcessor( dao, publisher, event_dispatcher );
// If the dao intializes successfully then process on a two second interval
const interval_ms = 2000;
let process_interval: NodeJS.Timer;
dao.init()
.then( _ => { setInterval( () => { processor.process(); }, interval_ms ); } )
.then( _ =>
{
publisher.connect();
} )
.then( _ =>
{
const pidPath = __dirname + '/../conf/.delta_processor.pid';
writePidFile(pidPath );
greet( 'Liza Delta Processor', pidPath );
process_interval = setInterval(
() =>
{
processor.process();
metrics.checkForErrors( dao );
},
interval_ms,
);
} )
.catch( err => { console.error( 'Mongo Error: ' + err ); } );
/**
* Output greeting
*
* The greeting contains the program name and PID file path.
*
* @param name - program name
* @param pid_path - path to PID file
*/
function greet( name: string, pid_path: string ): void
{
console.log( `${name}`);
console.log( `PID file: ${pid_path}` );
}
/**
* Write process id (PID) file
*
* @param pid_path - path to pid file
*/
function writePidFile( pid_path: string ): void
{
fs.writeFileSync( pid_path, process.pid );
process.on( 'SIGINT', function()
{
shutdown( 'SIGINT' );
} )
.on( 'SIGTERM', function()
{
shutdown( 'SIGTERM' );
} )
.on( 'exit', () =>
{
fs.unlink( pid_path, () => {} );
} );
}
/**
* Perform a graceful shutdown
*
* @param signal - the signal that caused the shutdown
*/
function shutdown( signal: string ): void
{
console.log( "Received " + signal + ". Beginning graceful shutdown:" );
console.log( "...Stopping processing interval" );
clearInterval( process_interval );
console.log( "...Closing MongoDb connection" );
db.close( ( err, _data ) =>
{
if ( err )
{
console.error( " Error closing connection: " + err );
}
} );
console.log( "...Closing AMQP connection..." );
publisher.close();
console.log( "Shutdown complete. Exiting." );
process.exit();
}
/** Timestamp constructor
*
* @return a timestamp
@ -95,7 +194,7 @@ function _createDB( conf: MongoDbConfig ): MongoDb
{
if( conf.ha )
{
var mongodbPort = conf.port || MongoConnection.DEFAULT_PORT;
var mongodbPort = conf.port || 27017;
var mongodbReplSet = conf.replset || 'rs0';
var dbServers = new ReplSetServers(
[
@ -109,7 +208,7 @@ function _createDB( conf: MongoDbConfig ): MongoDb
{
var dbServers = new MongoServer(
conf.host || '127.0.0.1',
conf.port || MongoConnection.DEFAULT_PORT,
conf.port || 27017,
{auto_reconnect: true}
);
}
@ -155,14 +254,31 @@ function _getAmqpConfig( env: any ): AmqpConfig
{
return <AmqpConfig>{
"protocol": "amqp",
"hostname": env.hostname,
"port": +( env.port || 0 ),
"username": env.username,
"password": env.password,
"hostname": env.amqp_hostname,
"port": +( env.amqp_port || 0 ),
"username": env.amqp_username,
"password": env.amqp_password,
"locale": "en_US",
"frameMax": 0,
"heartbeat": 0,
"vhost": env.vhost,
"exchange": env.exchange,
"vhost": env.amqp_vhost,
"exchange": env.amqp_exchange,
};
}
/**
* Create a prometheus configuration from the environment
*
* @param env - the environment variables
*
* @return the prometheus configuration
*/
function _getPrometheusConfig( env: any ): PrometheusConfig
{
return <PrometheusConfig>{
"hostname": env.prom_hostname,
"port": +( env.prom_port || 0 ),
"env": process.env.NODE_ENV,
};
}

View File

@ -30,7 +30,7 @@
},
"devDependencies": {
"typescript": "~3.7",
"@types/node": "@TS_NODE_VERSION@",
"@types/node": "12.12.11",
"chai": ">=1.9.1 < 4",
"@types/chai": ">=1.9.1 < 4",
"chai-as-promised": "7.1.0",
@ -40,7 +40,8 @@
"sinon": ">=1.17.4",
"es6-promise": "~3",
"@types/amqplib": "0.5.13",
"avro-js": "1.9.1"
"avro-js": "1.9.1",
"prom-client": "11.0.0"
},
"licenses": [

View File

@ -38,5 +38,5 @@ export interface AmqpPublisher
*
* @param delta - The delta to publish
*/
publish( delta: DeltaResult<any> ): Promise<null>;
publish( delta: DeltaResult<any> ): Promise<NullableError>;
}

View File

@ -55,7 +55,9 @@ export class DeltaLogger
private readonly _env: string,
private readonly _subscriber: EventSubscriber,
private readonly _ts_ctr : () => UnixTimestamp,
) {}
) {
this.init();
}
/**
@ -65,7 +67,7 @@ export class DeltaLogger
{
this._registerEvent( 'document-processed', LogLevel.NOTICE );
this._registerEvent( 'delta-publish', LogLevel.NOTICE );
this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
this._registerEvent( 'avro-err', LogLevel.ERROR );
this._registerEvent( 'mongodb-err', LogLevel.ERROR );
this._registerEvent( 'publish-err', LogLevel.ERROR );
}

View File

@ -59,7 +59,7 @@ export class DeltaProcessor
{
let self = this;
this._dao.getUnprocessedDocuments()
self._dao.getUnprocessedDocuments()
.then( docs =>
{
docs.forEach( doc =>
@ -68,23 +68,50 @@ export class DeltaProcessor
const doc_id: DocumentId = doc.id;
const last_updated_ts = doc.lastUpdate;
deltas.forEach( delta =>
for ( let i = 0; i < deltas.length; i++ )
{
const delta = deltas[ i ];
const startTime = process.hrtime();
let error = null;
self._publisher.publish( delta )
.then( _ =>
{
self._dao.advanceDeltaIndex( doc_id, delta.type );
} )
.catch( _ =>
.catch( err =>
{
// TODO: blow up?
self._dao.setErrorFlag( doc_id );
error = err;
} );
});
// Do not process any more deltas for
// this document if there was an error
if ( error )
{
self._dispatcher.dispatch(
'delta-process-error',
error
);
return;
}
else
{
const elapsedTime = process.hrtime( startTime );
self._dispatcher.dispatch(
'delta-process-complete',
elapsedTime[ 1 ] / 10000
);
}
};
self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
.then( _ =>
{
this._dispatcher.dispatch(
self._dispatcher.dispatch(
'document-processed',
'Deltas on document ' + doc_id + ' processed '
+ 'successfully. Document has been marked as '
@ -93,13 +120,13 @@ export class DeltaProcessor
} )
.catch( err =>
{
this._dispatcher.dispatch( 'mongodb-err', err );
self._dispatcher.dispatch( 'mongodb-err', err );
} );
});
} );
} )
.catch( err =>
{
this._dispatcher.dispatch( 'mongodb-err', err );
self._dispatcher.dispatch( 'mongodb-err', err );
} );
}
@ -137,14 +164,15 @@ export class DeltaProcessor
const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
// Get type specific delta index
let last_published_index = 0;
if ( doc.lastPublishDelta )
let published_count = 0;
if ( doc.totalPublishDelta )
{
last_published_index = doc.lastPublishDelta[ type ] || 0;
published_count = doc.totalPublishDelta[ type ] || 0;
}
// Only return the unprocessed deltas
const deltas_trimmed = deltas.slice( last_published_index );
console.log( published_count );
const deltas_trimmed = deltas.slice( published_count );
// Mark each delta with its type
deltas_trimmed.forEach( delta =>

View File

@ -27,7 +27,8 @@ import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
Options,
Channel
Channel,
Connection,
} from 'amqplib';
const avro = require( 'avro-js' );
@ -38,8 +39,23 @@ export interface AmqpConfig extends Options.Connect {
}
export interface AvroSchema {
/** Write data to a buffer */
toBuffer( data: Record<string, any> ): Buffer | null;
}
export class DeltaPublisher implements AmqpPublisher
{
/** The amqp connection */
private _conn?: Connection;
/** The amqp channel */
private _channel?: Channel;
/** The avro schema */
private _type?: AvroSchema;
/** The path to the avro schema */
readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
@ -51,7 +67,7 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Initialize publisher
* Delta publisher
*
* @param _conf - amqp configuration
* @param _emitter - event emitter instance
@ -61,7 +77,57 @@ export class DeltaPublisher implements AmqpPublisher
private readonly _conf: AmqpConfig,
private readonly _dispatcher: EventDispatcher,
private readonly _ts_ctr : () => UnixTimestamp,
) {}
) {
this._type = avro.parse( this.SCHEMA_PATH );
}
/**
* Initialize connection
*/
connect(): Promise<NullableError>
{
return new Promise<null>( ( resolve, reject ) =>
{
amqpConnect( this._conf )
.then( conn =>
{
this._conn = conn;
return this._conn.createChannel();
} )
.then( ( ch: Channel ) =>
{
this._channel = ch;
this._channel.assertExchange(
this._conf.exchange,
'fanout',
{ durable: true }
);
resolve();
return;
} )
.catch( e =>
{
reject( e );
return;
} );
} );
}
/**
* Close the amqp conenction
*/
close(): void
{
if ( this._conn )
{
this._conn.close.bind(this._conn);
}
}
/**
@ -71,57 +137,34 @@ export class DeltaPublisher implements AmqpPublisher
*
* @return whether the message was published successfully
*/
publish( delta: DeltaResult<any> ): Promise<null>
publish( delta: DeltaResult<any> ): Promise<NullableError>
{
const exchange = this._conf.exchange;
return new Promise<null>( ( resolve, reject ) =>
return new Promise<NullableError>( ( resolve, reject ) =>
{
amqpConnect( this._conf )
.then( conn =>
{
setTimeout( () => conn.close(), 10000 );
return conn.createChannel();
} )
.then( ch =>
{
ch.assertExchange( exchange, 'fanout', { durable: true } );
const startTime = process.hrtime();
return this.sendMessage( ch, exchange, delta );
} )
.then( sentSuccessfully =>
this.sendMessage( delta )
.then( _ =>
{
console.log('sentSuccessfully', sentSuccessfully);
if ( sentSuccessfully )
{
this._dispatcher.dispatch(
'delta-publish',
"Published " + delta.type + " delta with ts '"
+ delta.timestamp + "' to '" + exchange
+ '" exchange',
);
this._dispatcher.dispatch(
'delta-publish',
"Published " + delta.type + " delta with ts '"
+ delta.timestamp + "' to '" + this._conf.exchange
+ '" exchange',
);
resolve();
}
else
{
this._dispatcher.dispatch(
'publish-err',
"Error publishing " + delta.type + " delta with ts '"
+ delta.timestamp + "' to '" + exchange
+ "' exchange",
);
reject();
}
console.log('#publish: '
+ process.hrtime( startTime )[0] / 10000 );
resolve();
return;
} )
.catch( e =>
{
this._dispatcher.dispatch(
'publish-err',
"Error publishing " + delta.type + " delta with ts '"
+ delta.timestamp + '" to "' + exchange + "' exchange '"
+ e,
+ delta.timestamp + '" to "' + this._conf.exchange
+ "' exchange: '" + e,
)
reject();
@ -133,75 +176,92 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Send message to exchange
*
* @param channel - AMQP channel
* @param exchange - exchange name
* @param delta - The delta to publish
* @param delta - The delta to publish
*
* @return whether publish was successful
*/
sendMessage(
channel: Channel,
exchange: string,
delta: DeltaResult<any>,
): boolean
sendMessage( delta: DeltaResult<any> ): Promise<NullableError>
{
const headers = {
version: 1,
created: Date.now(),
};
// Convert all delta datums to string for avro
const delta_data = this.avroFormat( delta.data );
const event_id = this.DELTA_MAP[ delta.type ];
const data = {
event: {
id: event_id,
ts: this._ts_ctr(),
actor: 'SERVER',
step: null,
},
document: {
id: 123123, // Fix
},
session: {
entity_name: 'Foobar', // Fix
entity_id: 123123, // Fix
},
data: {
Data: {
bucket: delta_data,
},
},
delta: {
Data: {
bucket: delta_data,
},
},
program: {
Program: {
id: 'quote_server',
version: 'dadaddwafdwa', // Fix
},
},
};
const avro_buffer = this.avroEncode( data );
if ( !avro_buffer )
return new Promise<NullableError>( ( resolve, reject ) =>
{
return false;
}
const startTime = process.hrtime();
// we don't use a routing key; fanout exchange
const routing_key = '';
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };
const delta_data = this.avroFormat( delta.data );
console.log('#sendmessage 1: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
const event_id = this.DELTA_MAP[ delta.type ];
const avro_buffer = this.avroEncode( {
event: {
id: event_id,
ts: ts,
actor: 'SERVER',
step: null,
},
document: {
id: 123123, // Fix
},
session: {
entity_name: 'Foobar', // Fix
entity_id: 123123, // Fix
},
data: {
Data: {
bucket: delta_data,
},
},
delta: {
Data: {
bucket: delta_data,
},
},
program: {
Program: {
id: 'quote_server',
version: 'dadaddwafdwa', // Fix
},
},
} );
console.log('#sendmessage 2: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
return channel.publish(
exchange,
routing_key,
avro_buffer,
{ headers: headers },
);
if ( !this._conn )
{
reject( 'Error sending message: No connection' );
return;
}
else if ( !this._channel )
{
reject( 'Error sending message: No channel' );
return;
}
else if ( !avro_buffer )
{
reject( 'Error sending message: No avro buffer' );
return;
}
console.log('#sendmessage 3: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
// we don't use a routing key; fanout exchange
const published_successfully = this._channel.publish(
this._conf.exchange,
'',
avro_buffer,
{ headers: headers },
);
if ( published_successfully )
{
console.log('#sendmessage 4: '
+ (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
resolve();
return;
}
reject( 'Error sending message: publishing failed' );
} );
}
@ -218,13 +278,22 @@ export class DeltaPublisher implements AmqpPublisher
try
{
const type = avro.parse( this.SCHEMA_PATH );
buffer = type.toBuffer( data );
if ( !this._type )
{
this._dispatcher.dispatch(
'avro-err',
'No avro scheama found',
);
return null;
}
buffer = this._type.toBuffer( data );
}
catch( e )
{
this._dispatcher.dispatch(
'avro-parse-err',
'avro-err',
'Error encoding data to avro: ' + e,
);
}

View File

@ -22,41 +22,177 @@
*/
import { EventSubscriber } from "./event/EventSubscriber";
import { DeltaDao } from "./db/DeltaDao";
import { PositiveInteger } from "../numeric";
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
const client = require('prom-client');
const client = require( 'prom-client' );
declare type MetricStructure = {
path: string;
code: number;
service: string;
env: string;
// declare type MetricStructure = {
// path: string;
// code: number;
// service: string;
// env: string;
// }
export declare type PrometheusConfig = {
/** The hostname to connect to */
hostname: string;
/** The port to connect to */
port: number;
/** The environment ( dev, test, demo, live ) */
env: string;
}
export class MetricsCollector
{
/** The prometheus PushGateway */
private _gateway: Pushgateway;
/** Metric push interval */
private _push_interval_ms: PositiveInteger = <PositiveInteger>5000;
/** Delta processed time histogram */
private _process_time_hist: Histogram;
private _process_time_params: Pushgateway.Parameters = {
jobName: 'liza_delta_process_time'
};
/** Delta error counter */
private _process_error_count: Counter;
private _process_error_params: Pushgateway.Parameters = {
jobName: 'liza_delta_error'
};
/** Delta current error gauge */
private _current_error_gauge: Gauge;
private _current_error_params: Pushgateway.Parameters = {
jobName: 'liza_delta_current_error'
};
/** Delta error counter */
private _process_delta_count: Counter;
private _process_delta_params: Pushgateway.Parameters = {
jobName: 'liza_delta_success'
};
/**
* Initialize delta logger
*
* @param _conf - the prometheus configuration
* @param _subscriber - the event subscriber
*/
constructor(
private readonly _env: string,
private readonly _conf: PrometheusConfig,
private readonly _subscriber: EventSubscriber,
) {}
) {
// Set labels
const default_labels = {
env: this._conf.env,
service: 'delta_processor',
};
client.register.setDefaultLabels( default_labels );
// Create gateway
const url = 'http://' + this._conf.hostname + ':' + this._conf.port;
this._gateway = new client.Pushgateway( url );
// Create metrics
this._process_time_hist = new client.Histogram( {
name: this._process_time_params.jobName,
help: 'Time in ms for deltas to be processed',
labelNames: [ 'env', 'service' ],
buckets: client.linearBuckets(0, 10, 10),
} );
this._process_error_count = new client.Counter( {
name: this._process_error_params.jobName,
help: 'Error count for deltas being processed',
labelNames: [ 'env', 'service' ],
} );
this._current_error_gauge = new client.Gauge( {
name: this._current_error_params.jobName,
help: 'The current number of documents in an error state',
labelNames: [ 'env', 'service' ],
} );
this._process_delta_count = new client.Counter( {
name: this._process_delta_params.jobName,
help: 'Count of deltas successfully processed',
labelNames: [ 'env', 'service' ],
} );
// Push metrics on a specific intervals
setInterval( () => { this.pushMetrics(); }, this._push_interval_ms );
// Subsribe metrics to events
this.subscribeMetrics();
}
/**
* Initialize the logger to look for specific events
* Subscribe metrics
*/
init(): void
private subscribeMetrics()
{
const collectDefaultMetrics = client.collectDefaultMetrics;
this._subscriber.subscribe(
'delta-process-complete',
( val ) =>
{
console.log( 'Got time: ' + val + 'ms' );
this._process_time_hist.observe( val );
this._process_delta_count.inc();
}
);
console.log( this._subscriber, collectDefaultMetrics)
this._formatLog( '', 123 );
// this._registerEvent( 'document-processed', LogLevel.NOTICE );
// this._registerEvent( 'delta-publish', LogLevel.NOTICE );
// this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
// this._registerEvent( 'mongodb-err', LogLevel.ERROR );
// this._registerEvent( 'publish-err', LogLevel.ERROR );
this._subscriber.subscribe(
'delta-process-error',
( _ ) => this._process_error_count.inc()
);
}
/**
* Push metrics to Prometheus PushGateway
*/
private pushMetrics(): void
{
// this._gateway.pushAdd( this._process_time_params, this.pushCallback );
// this._gateway.pushAdd( this._process_error_params, this.pushCallback );
// this._gateway.pushAdd( this._current_error_params, this.pushCallback );
// this._gateway.pushAdd( this._process_delta_params, this.pushCallback );
this._gateway.pushAdd(
{
jobName: 'liza_delta_metrics'
},
this.pushCallback
);
}
/**
* Handle push error
*
* @param error - Any errors that occurred
* @param response - The http response
* @param body - The resposne body
*/
private pushCallback(
_error?: Error | undefined,
_response?: any,
_body?: any
): void
{
// console.log( 'Push callback' );
// console.error( error, response, body );
}
@ -68,13 +204,35 @@ export class MetricsCollector
*
* @returns a structured logging object
*/
private _formatLog( path: string, code: number ): MetricStructure
// private _formatMetricVal( label: string, val: any ): MetricStructure
// {
// return <MetricStructure>{
// path: path,
// code: code,
// service: 'quote-server',
// env: this._conf.env,
// };
// }
/**
* Look for mongodb delta errors and update metrics if found
*
* @return any errors the occurred
*/
checkForErrors( dao: DeltaDao ): NullableError
{
return <MetricStructure>{
path: path,
code: code,
service: 'quote-server',
env: this._env,
};
dao.getErrorCount()
.then( count =>
{
// console.log( 'Error count: ', count );
this._current_error_gauge.set( +count );
} )
.catch( err =>
{
return err;
} );
return null;
}
}

View File

@ -47,7 +47,7 @@ export interface DeltaDao
* @param doc_id - Document whose index will be set
* @param type - Delta type
*
* @return any errors that occured
* @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@ -62,11 +62,29 @@ export interface DeltaDao
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
* @return true if the document was successfully marked as processed
* @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
): Promise<NullableError>
/**
* Flag the document as being in an error state
*
* @param doc_id - The document to flag
*
* @return any errors that occurred
*/
setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
/**
* Get a count of documents in an error state
*
* @return a count of the documents in an error state
*/
getErrorCount(): Promise<number | Error>
}

View File

@ -59,7 +59,7 @@ export class MongoDeltaDao implements DeltaDao
*
* connectError event will be emitted on failure.
*
* @return any errors that occured
* @return any errors that occurred
*/
init(): Promise<NullableError>
{
@ -73,14 +73,18 @@ export class MongoDeltaDao implements DeltaDao
// if there was an error, don't bother with anything else
if ( err )
{
// in some circumstances, it may just be telling us that we're
// already connected (even though the connection may have been
// broken)
// in some circumstances, it may just be telling us that
// we're already connected (even though the connection may
// have been broken)
if ( err.errno !== undefined )
{
reject( 'Error opening mongo connection: ' + err );
return;
}
} else if ( db == null )
{
reject( 'No database connection' );
return;
}
// quotes collection
@ -116,7 +120,7 @@ export class MongoDeltaDao implements DeltaDao
);
}
);
});
} );
} );
}
@ -165,8 +169,10 @@ export class MongoDeltaDao implements DeltaDao
/**
* Set the document's processed index
*
* @param doc_id - Document whose index will be set
* @param type - Delta type
* @param doc_id - Document whose index will be set
* @param type - Delta type
*
* @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@ -177,7 +183,7 @@ export class MongoDeltaDao implements DeltaDao
{
const inc_data: Record<string, any> = {};
inc_data[ 'lastPublishDelta.' + type ] = 1;
inc_data[ 'totalPublishDelta.' + type ] = 1;
this._collection!.update(
{ id: doc_id },
@ -206,7 +212,7 @@ export class MongoDeltaDao implements DeltaDao
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
* @return true if the document was successfully marked as processed
* @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
@ -231,7 +237,74 @@ export class MongoDeltaDao implements DeltaDao
return;
}
);
} );
}
/**
* Flag the document as being in an error state
*
* @param doc_id - The document to flag
*
* @return any errors that occurred
*/
setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
{
return new Promise( ( resolve, reject ) =>
{
this._collection!.update(
{ id: doc_id },
{ $set: { deltaError: true } },
{ upsert: false },
function( err )
{
if ( err )
{
reject( "Failed setting error flag: " + err );
return;
}
resolve();
return;
}
);
} );
}
/**
* Get a count of documents in an error state
*
* @return a count of the documents in an error state
*/
getErrorCount(): Promise<number | Error>
{
return new Promise( ( resolve, reject ) =>
{
this._collection!.find(
{ deltaError: true },
{},
function( err, cursor )
{
if ( err )
{
reject( err );
return;
}
cursor.toArray( function( err: NullableError, data: any[] )
{
if ( err )
{
reject( err );
return;
}
// return the count
resolve( data.length );
});
}
)
} );
}
}

View File

@ -51,6 +51,14 @@ export interface MongoDb
* @param callback continuation on completion
*/
open( callback: MongoCallback ): void;
/**
* Close the database connection
*
* @param callback continuation on completion
*/
close( callback: MongoCallback ): void;
}

View File

@ -241,7 +241,7 @@ describe( 'system.DeltaProcessor', () =>
},
],
},
lastPublishDelta: {
totalPublishDelta: {
data: 0,
},
},
@ -274,7 +274,7 @@ describe( 'system.DeltaProcessor', () =>
},
],
},
lastPublishDelta: {
totalPublishDelta: {
data: 1,
},
},
@ -308,6 +308,8 @@ function createMockDeltaDao(): DeltaDao
getUnprocessedDocuments() { return Promise.resolve( [] ); },
advanceDeltaIndex() { return Promise.resolve( null ); },
markDocumentAsProcessed() { return Promise.resolve( null ); },
setErrorFlag() { return Promise.resolve( null ); },
getErrorCount() { return Promise.resolve( 0 ); },
};
}