1
0
Fork 0

[DEV-5312] Refactor references from 'self' to 'this', pass console into event logger, and add factory for prometheus

master
Austin Schaffer 2019-11-25 17:20:56 -05:00
parent 5ee9a5d340
commit 1b96cd9147
10 changed files with 495 additions and 309 deletions

21
.env
View File

@ -1,7 +1,16 @@
hostname=localhost
port=5672
username=quote_referral
password=
vhost=dev
exchange=quoteupdate
NODE_ENV=dev
amqp_hostname=localhost
amqp_port=5672
amqp_username=quote_referral
amqp_password=
amqp_frameMax=0
amqp_heartbeat=2
amqp_vhost=quote
amqp_exchange=quoteupdate
amqp_retries=30
amqp_retry_wait=1
prom_hostname=dmz2docker01.rsgcorp.local
prom_port=9091
prom_push_interval_ms=5000
process_interval_ms=2000

View File

@ -18,19 +18,20 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import fs = require( 'fs' );
import fs = require( 'fs' );
import { AmqpConfig } from "../src/system/AmqpPublisher";
import { MongoDeltaDao } from "../src/system/db/MongoDeltaDao";
import { DeltaProcessor } from "../src/system/DeltaProcessor";
import { DeltaPublisher } from "../src/system/DeltaPublisher";
import { MongoDb, MongoDbConfig } from "../src/types/mongodb";
import { DeltaLogger } from "../src/system/DeltaLogger";
import { EventEmitter } from "events";
import { AmqpConfig } from '../src/system/AmqpPublisher';
import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
import { DeltaProcessor } from '../src/system/DeltaProcessor';
import { DeltaPublisher } from '../src/system/DeltaPublisher';
import { MongoDb, MongoDbConfig, MongoCollection } from '../src/types/mongodb';
import { EventLogger } from '../src/system/EventLogger';
import { EventEmitter } from 'events';
import { PrometheusFactory } from '../src/system/PrometheusFactory';
import {
MetricsCollector,
PrometheusConfig,
} from "../src/system/MetricsCollector";
} from '../src/system/MetricsCollector';
const {
Db: MongoDb,
@ -38,53 +39,40 @@ const {
ReplServers: ReplSetServers,
} = require( 'mongodb' );
// TODO: fix this
process.env.NODE_ENV = 'dev';
process.env.amqp_hostname = 'localhost';
process.env.amqp_port = '5672';
process.env.amqp_username = 'quote_referral';
process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
process.env.amqp_frameMax = '0';
process.env.amqp_heartbeat = '2';
process.env.amqp_vhost = 'quote';
process.env.amqp_exchange = 'quoteupdate';
process.env.amqp_retries = '30';
process.env.amqp_retry_wait = '1';
process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
process.env.prom_port = '9091';
const amqp_conf = _getAmqpConfig( process.env );
const db_conf = _getMongoConfig( process.env );
const prom_conf = _getPrometheusConfig( process.env );
const env = process.env.NODE_ENV || 'Unknown Environment';
const process_interval_ms = +(process.env.process_interval_ms || 2000);
const emitter = new EventEmitter();
const db = _createDB( db_conf );
// Environment variables
const amqp_conf = _getAmqpConfig( process.env );
const db_conf = _getMongoConfig( process.env );
const prom_conf = _getPrometheusConfig( process.env );
const env = process.env.NODE_ENV || 'Unknown Environment';
// Prometheus Metrics
const prom_factory = new PrometheusFactory();
const metrics = new MetricsCollector( prom_factory, prom_conf, emitter );
// Event handling
const emitter = new EventEmitter();
// Event subscribers
new DeltaLogger( env, emitter, ts_ctr );
const metrics = new MetricsCollector( prom_conf, emitter );
// Instantiate classes for processor
const db = _createDB( db_conf );
const dao = new MongoDeltaDao( db );
const publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr );
const processor = new DeltaProcessor( dao, publisher, emitter );
// If the dao intializes successfully then process on a two second interval
const interval_ms = 2000;
// Structured logging
new EventLogger( console, env, emitter, ts_ctr );
let process_interval: NodeJS.Timer;
let dao: MongoDeltaDao;
let publisher: DeltaPublisher;
let processor: DeltaProcessor;
dao.init()
_getMongoCollection( db, db_conf )
.then( ( conn: MongoCollection ) =>
{
return new MongoDeltaDao( conn );
} )
.then( ( mongoDao: MongoDeltaDao ) =>
{
dao = mongoDao;
publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr );
processor = new DeltaProcessor( mongoDao, publisher, emitter );
} )
.then( _ =>
{
publisher.connect()
.catch( e =>
{
console.error( 'AMQP connection error: ' + e );
} );
publisher.connect();
} )
.then( _ =>
{
@ -99,10 +87,10 @@ dao.init()
processor.process();
metrics.checkForErrors( dao );
},
interval_ms,
process_interval_ms,
);
} )
.catch( err => { console.error( 'Error: ' + err ); } );
.catch( e => { console.error( 'Error: ' + e ); } );
/**
@ -151,27 +139,26 @@ function writePidFile( pid_path: string ): void
*/
function shutdown( signal: string ): void
{
console.log( "Received " + signal + ". Beginning graceful shutdown:" );
console.log( "...Stopping processing interval" );
console.log( 'Received ' + signal + '. Beginning graceful shutdown:' );
console.log( '...Stopping processing interval' );
clearInterval( process_interval );
console.log( "...Closing MongoDb connection" );
console.log( '...Closing MongoDb connection' );
db.close( ( err, _data ) =>
{
if ( err )
{
console.error( " Error closing connection: " + err );
console.error( ' Error closing connection: ' + err );
}
} );
console.log( "...Closing AMQP connection..." );
console.log( '...Closing AMQP connection...' );
publisher.close();
console.log( "Shutdown complete. Exiting." );
console.log( 'Shutdown complete. Exiting.' );
process.exit();
}
@ -225,6 +212,77 @@ function _createDB( conf: MongoDbConfig ): MongoDb
}
/**
* Attempts to connect to the database
*
* connectError event will be emitted on failure.
*
* @return any errors that occurred
*/
function _getMongoCollection(
db: MongoDb,
conf: MongoDbConfig
): Promise<MongoCollection>
{
return new Promise( ( resolve, reject ) =>
{
// attempt to connect to the database
db.open( ( e: any, db: any ) =>
{
// if there was an error, don't bother with anything else
if ( e )
{
// in some circumstances, it may just be telling us that
// we're already connected (even though the connection may
// have been broken)
if ( e.errno !== undefined )
{
reject( 'Error opening mongo connection: ' + e );
return;
}
} else if ( db == null )
{
reject( 'No database connection' );
return;
}
// quotes collection
db.collection(
conf.collection,
( e: any, collection: MongoCollection ) =>
{
if ( e )
{
reject( 'Error creating collection: ' + e );
return;
}
// initialize indexes
collection.createIndex(
[
['published', 1],
['deltaError', 1],
],
true,
( e: any, _index: { [P: string]: any } ) =>
{
if ( e )
{
reject( 'Error creating index: ' + e );
return;
}
resolve( collection );
return;
}
);
}
);
} );
} );
}
/**
* Create a mongodb configuration from the environment
*
@ -235,14 +293,15 @@ function _createDB( conf: MongoDbConfig ): MongoDb
function _getMongoConfig( env: any ): MongoDbConfig
{
return <MongoDbConfig>{
"port": +( env.MONGODB_PORT || 0 ),
"ha": +( env.LIZA_MONGODB_HA || 0 ) == 1,
"replset": env.LIZA_MONGODB_REPLSET,
"host": env.MONGODB_HOST,
"host_a": env.LIZA_MONGODB_HOST_A,
"port_a": +( env.LIZA_MONGODB_PORT_A || 0 ),
"host_b": env.LIZA_MONGODB_HOST_B,
"port_b": +( env.LIZA_MONGODB_PORT_B || 0 ),
'port': +( env.MONGODB_PORT || 0 ),
'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1,
'replset': env.LIZA_MONGODB_REPLSET,
'host': env.MONGODB_HOST,
'host_a': env.LIZA_MONGODB_HOST_A,
'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ),
'host_b': env.LIZA_MONGODB_HOST_B,
'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ),
'collection': 'quotes',
};
}
@ -257,18 +316,18 @@ function _getMongoConfig( env: any ): MongoDbConfig
function _getAmqpConfig( env: any ): AmqpConfig
{
return <AmqpConfig>{
"protocol": "amqp",
"hostname": env.amqp_hostname,
"port": +( env.amqp_port || 0 ),
"username": env.amqp_username,
"password": env.amqp_password,
"locale": "en_US",
"frameMax": env.amqp_frameMax,
"heartbeat": env.amqp_heartbeat,
"vhost": env.amqp_vhost,
"exchange": env.amqp_exchange,
"retries": env.amqp_retries || 30,
"retry_wait": env.amqp_retry_wait || 1,
'protocol': 'amqp',
'hostname': env.amqp_hostname,
'port': +( env.amqp_port || 0 ),
'username': env.amqp_username,
'password': env.amqp_password,
'locale': 'en_US',
'frameMax': env.amqp_frameMax,
'heartbeat': env.amqp_heartbeat,
'vhost': env.amqp_vhost,
'exchange': env.amqp_exchange,
'retries': env.amqp_retries || 30,
'retry_wait': env.amqp_retry_wait || 1,
};
}
@ -283,8 +342,9 @@ function _getAmqpConfig( env: any ): AmqpConfig
function _getPrometheusConfig( env: any ): PrometheusConfig
{
return <PrometheusConfig>{
"hostname": env.prom_hostname,
"port": +( env.prom_port || 0 ),
"env": process.env.NODE_ENV,
'hostname': env.prom_hostname,
'port': +( env.prom_port || 0 ),
'env': process.env.NODE_ENV,
'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ),
};
}

View File

@ -57,14 +57,12 @@ export class DeltaProcessor
*/
process(): void
{
let self = this;
self._dao.getUnprocessedDocuments()
this._dao.getUnprocessedDocuments()
.then( docs =>
{
docs.forEach( doc =>
{
const deltas = self.getTimestampSortedDeltas( doc );
const deltas = this.getTimestampSortedDeltas( doc );
const doc_id: DocumentId = doc.id;
const last_updated_ts = doc.lastUpdate;
@ -74,14 +72,14 @@ export class DeltaProcessor
const startTime = process.hrtime();
let error = null;
self._publisher.publish( delta )
this._publisher.publish( delta )
.then( _ =>
{
self._dao.advanceDeltaIndex( doc_id, delta.type );
this._dao.advanceDeltaIndex( doc_id, delta.type );
} )
.catch( err =>
{
self._dao.setErrorFlag( doc_id );
this._dao.setErrorFlag( doc_id );
error = err;
} );
@ -90,9 +88,10 @@ export class DeltaProcessor
// this document if there was an error
if ( error )
{
self._emitter.emit(
this._emitter.emit(
'delta-process-error',
error
error,
doc_id + delta.timestamp + delta.type
);
return;
@ -101,17 +100,17 @@ export class DeltaProcessor
{
const elapsedTime = process.hrtime( startTime );
self._emitter.emit(
this._emitter.emit(
'delta-process-complete',
elapsedTime[ 1 ] / 10000
);
}
};
self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
.then( _ =>
{
self._emitter.emit(
this._emitter.emit(
'document-processed',
'Deltas on document ' + doc_id + ' processed '
+ 'successfully. Document has been marked as '
@ -120,13 +119,13 @@ export class DeltaProcessor
} )
.catch( err =>
{
self._emitter.emit( 'mongodb-err', err );
this._emitter.emit( 'mongodb-err', err );
} );
} );
} )
.catch( err =>
{
self._emitter.emit( 'mongodb-err', err );
this._emitter.emit( 'mongodb-err', err );
} );
}
@ -171,7 +170,6 @@ export class DeltaProcessor
}
// Only return the unprocessed deltas
console.log( published_count );
const deltas_trimmed = deltas.slice( published_count );
// Mark each delta with its type

View File

@ -79,7 +79,7 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Initialize connection
*/
connect(): Promise<NullableError>
connect(): Promise<null>
{
return new Promise<null>( ( resolve, reject ) =>
{
@ -181,9 +181,9 @@ export class DeltaPublisher implements AmqpPublisher
*
* @return whether the message was published successfully
*/
publish( delta: DeltaResult<any> ): Promise<NullableError>
publish( delta: DeltaResult<any> ): Promise<null>
{
return new Promise<NullableError>( ( resolve, reject ) =>
return new Promise<null>( ( resolve, reject ) =>
{
this.sendMessage( delta )
.then( _ =>
@ -220,9 +220,9 @@ export class DeltaPublisher implements AmqpPublisher
*
* @return whether publish was successful
*/
sendMessage( delta: DeltaResult<any> ): Promise<NullableError>
sendMessage( delta: DeltaResult<any> ): Promise<null>
{
return new Promise<NullableError>( ( resolve, reject ) =>
return new Promise<null>( ( resolve, reject ) =>
{
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };

View File

@ -1,5 +1,5 @@
/**
* Delta logger
* Event logger
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
@ -18,7 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Logger for delta events
* PSR-12 style logger based on node events
*/
import { EventEmitter } from "events";
@ -42,19 +42,20 @@ declare type StructuredLog = {
severity: string;
}
export class DeltaLogger
export class EventLogger
{
/**
* Initialize delta logger
* Initialize logger
*
* @param _env - The environment ( dev, test, demo, live )
* @param _emitter - An event emitter
* @param _ts_ctr - a timestamp constructor
*/
constructor(
private readonly _env: string,
private readonly _console: Console,
private readonly _env: string,
private readonly _emitter: EventEmitter,
private readonly _ts_ctr : () => UnixTimestamp,
private readonly _ts_ctr: () => UnixTimestamp,
) {
this.init();
}
@ -73,6 +74,16 @@ export class DeltaLogger
this._registerEvent( 'avro-err', LogLevel.ERROR );
this._registerEvent( 'mongodb-err', LogLevel.ERROR );
this._registerEvent( 'publish-err', LogLevel.ERROR );
// this._registerEvent( 'log', LogLevel.INFO );
// this._registerEvent( 'debug', LogLevel.DEBUG );
// this._registerEvent( 'info', LogLevel.INFO );
// this._registerEvent( 'notice', LogLevel.NOTICE );
// this._registerEvent( 'warning', LogLevel.WARNING );
// this._registerEvent( 'error', LogLevel.ERROR );
// this._registerEvent( 'critical', LogLevel.CRITICAL );
// this._registerEvent( 'alert', LogLevel.ALERT );
// this._registerEvent( 'emergency', LogLevel.EMERGENCY );
}
@ -103,18 +114,18 @@ export class DeltaLogger
{
case LogLevel.DEBUG:
case LogLevel.INFO:
return ( _ ) => console.info( this._formatLog( _, level ) );
return ( str ) => this._console.info( this._format( str, level ) );
case LogLevel.NOTICE:
return ( _ ) => console.log( this._formatLog( _, level ) );
return ( str ) => this._console.log( this._format( str, level ) );
case LogLevel.WARNING:
return ( _ ) => console.warn( this._formatLog( _, level ) );
return ( str ) => this._console.warn( this._format( str, level ) );
case LogLevel.ERROR:
case LogLevel.CRITICAL:
case LogLevel.ALERT:
case LogLevel.EMERGENCY:
return ( _ ) => console.error( this._formatLog( _, level ) );
return ( str ) => this._console.error( this._format( str, level ) );
default:
return ( _ ) => console.log( "UNKNOWN LOG LEVEL: " + _ );
return ( str ) => this._console.log( "UNKNOWN LOG LEVEL: " + str );
}
}
@ -127,7 +138,7 @@ export class DeltaLogger
*
* @returns a structured logging object
*/
private _formatLog( str: string, level: LogLevel ): StructuredLog
private _format( str: string, level: LogLevel ): StructuredLog
{
return <StructuredLog>{
message: str,

View File

@ -22,12 +22,11 @@
*/
import { DeltaDao } from "./db/DeltaDao";
import { PositiveInteger } from "../numeric";
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
import { EventEmitter } from "events";
import { PrometheusFactory } from './PrometheusFactory';
const client = require( 'prom-client' );
const client = require( 'prom-client' )
export declare type PrometheusConfig = {
/** The hostname to connect to */
@ -38,6 +37,9 @@ export declare type PrometheusConfig = {
/** The environment ( dev, test, demo, live ) */
env: string;
/** The rate (in milliseconds) at which metrics are pushed */
push_interval_ms: number;
}
@ -46,41 +48,38 @@ export class MetricsCollector
/** The prometheus PushGateway */
private _gateway: Pushgateway;
/** Metric push interval */
private _push_interval_ms: PositiveInteger = <PositiveInteger>5000;
/** Delta processed time histogram */
private _process_time_hist: Histogram;
private _process_time_params: Pushgateway.Parameters = {
jobName: 'liza_delta_process_time'
};
private _process_time: Histogram;
private _process_time_name: string = 'liza_delta_process_time';
private _process_time_help: string = 'Delta process time in ms';
/** Delta error counter */
private _process_error_count: Counter;
private _process_error_params: Pushgateway.Parameters = {
jobName: 'liza_delta_error'
};
private _total_error: Counter;
private _total_error_name: string = 'liza_delta_error';
private _total_error_help: string = 'Total errors from delta processing';
/** Delta current error gauge */
private _current_error_gauge: Gauge;
private _current_error_params: Pushgateway.Parameters = {
jobName: 'liza_delta_current_error'
};
private _current_error: Gauge;
private _current_error_name: string = 'liza_delta_current_error';
private _current_error_help: string =
'The current number of documents in an error state';
/** Delta error counter */
private _process_delta_count: Counter;
private _process_delta_params: Pushgateway.Parameters = {
jobName: 'liza_delta_success'
};
private _total_processed: Counter;
private _total_processed_name: string = 'liza_delta_success';
private _total_processed_help: string =
'Total deltas successfully processed';
/**
* Initialize delta logger
*
* @param _conf - the prometheus configuration
* @param _emitter - the event emitr
* @param _factory - A factory to create prometheus components
* @param _conf - Prometheus configuration
* @param _emitter - Event emitter
*/
constructor(
private readonly _conf: PrometheusConfig,
private readonly _factory: PrometheusFactory,
private readonly _conf: PrometheusConfig,
private readonly _emitter: EventEmitter,
) {
// Set labels
@ -89,68 +88,72 @@ export class MetricsCollector
service: 'delta_processor',
} );
// Create gateway
const url = 'http://' + this._conf.hostname + ':' + this._conf.port;
this._gateway = new client.Pushgateway( url );
// Create metrics
this._process_time_hist = new client.Histogram( {
name: this._process_time_params.jobName,
help: 'Time in ms for deltas to be processed',
labelNames: [ 'env', 'service' ],
buckets: client.linearBuckets(0, 10, 10),
} );
this._gateway = this._factory.createGateway(
client,
this._conf.hostname,
this._conf.port,
);
this._process_error_count = new client.Counter( {
name: this._process_error_params.jobName,
help: 'Error count for deltas being processed',
labelNames: [ 'env', 'service' ],
} );
this._process_time = this._factory.createHistogram(
client,
this._process_time_name,
this._process_time_help,
0,
10,
10,
);
this._current_error_gauge = new client.Gauge( {
name: this._current_error_params.jobName,
help: 'The current number of documents in an error state',
labelNames: [ 'env', 'service' ],
} );
this._total_error = this._factory.createCounter(
client,
this._total_error_name,
this._total_error_help,
);
this._process_delta_count = new client.Counter( {
name: this._process_delta_params.jobName,
help: 'Count of deltas successfully processed',
labelNames: [ 'env', 'service' ],
} );
this._current_error = this._factory.createGauge(
client,
this._current_error_name,
this._current_error_help,
);
// Push metrics on a specific intervals
this._total_processed = this._factory.createCounter(
client,
this._total_processed_name,
this._total_processed_help,
);
// Push metrics on a specific interval
setInterval(
() =>
{
this._gateway.pushAdd(
{ jobName: 'liza_delta_metrics' }, this.pushCallback
);
}, this._push_interval_ms
}, this._conf.push_interval_ms
);
// Subsribe metrics to events
this.emitMetrics();
this.hookMetrics();
}
/**
* emit metrics
* List to events to update metrics
*/
private emitMetrics()
private hookMetrics()
{
this._emitter.on(
'delta-process-complete',
( val: any ) =>
{
this._process_time_hist.observe( val );
this._process_delta_count.inc();
this._process_time.observe( val );
this._total_processed.inc();
}
);
this._emitter.on(
'delta-process-error',
( _ ) => this._process_error_count.inc()
( _ ) => this._total_error.inc()
);
}
@ -182,15 +185,8 @@ export class MetricsCollector
checkForErrors( dao: DeltaDao ): NullableError
{
dao.getErrorCount()
.then( count =>
{
// console.log( 'Error count: ', count );
this._current_error_gauge.set( +count );
} )
.catch( err =>
{
return err;
} );
.then( count => { this._current_error.set( +count ); } )
.catch( err => { return err; } );
return null;
}

View File

@ -0,0 +1,122 @@
/**
* Prometheus Factory functions
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Prometheus Metrics
*/
import { Pushgateway, Histogram, Counter, Gauge } from 'prom-client';
export class PrometheusFactory
{
/**
* Create a PushGateway
*
* @param client - prometheus client
* @param url - the url of the push gateway
*
* @return the gateway
*/
createGateway(
client: any,
hostname: string,
port: number,
): Pushgateway
{
const url = 'http://' + hostname + ':' + port;
return new client.Pushgateway( url );
}
/**
* Create a histogram metric
*
* @param client - prometheus client
* @param name - metric name
* @param help - a description of the metric
* @param bucket_start - where to start the range of buckets
* @param bucket_width - the size of each bucket
* @param bucket_count - the total number of buckets
*
* @return the metric
*/
createHistogram(
client: any,
name: string,
help: string,
bucket_start: number,
bucket_width: number,
bucket_count: number,
): Histogram
{
return new client.Histogram( {
name: name,
help: help,
buckets: client.linearBuckets(
bucket_start,
bucket_width,
bucket_count
),
} );
}
/**
* Create a counter metric
*
* @param client - prometheus client
* @param name - metric name
* @param help - a description of the metric
*
* @return the metric
*/
createCounter(
client: any,
name: string,
help: string,
): Counter
{
return new client.Counter( {
name: name,
help: help,
} );
}
/**
* Create a gauge metric
*
* @param client - prometheus client
* @param name - metric name
* @param help - a description of the metric
*
* @return the metric
*/
createGauge(
client: any,
name: string,
help: string,
): Gauge
{
return new client.Gauge( {
name: name,
help: help,
} );
}
}

View File

@ -21,9 +21,11 @@
* Get deltas from the mongo document in order to process and publish them
*/
import { DocumentId } from "../../document/Document";
import { DeltaDao } from "./DeltaDao";
import { MongoCollection } from "mongodb";
import { DocumentId } from '../../document/Document';
import { DeltaDao } from './DeltaDao';
import { MongoCollection } from 'mongodb';
import { context } from '../../error/ContextError';
import { MongoError } from './MongoError';
export type MongoDeltaType = 'ratedata' | 'data';
@ -31,18 +33,12 @@ export type MongoDeltaType = 'ratedata' | 'data';
/** Manage deltas */
export class MongoDeltaDao implements DeltaDao
{
/** Collection used to store quotes */
readonly COLLECTION: string = 'quotes';
/** The ratedata delta type */
static readonly DELTA_RATEDATA: string = 'ratedata';
/** The data delta type */
static readonly DELTA_DATA: string = 'data';
/** The mongo quotes collection */
private _collection?: MongoCollection | null;
/**
* Initialize connection
@ -50,81 +46,10 @@ export class MongoDeltaDao implements DeltaDao
* @param _db Mongo db
*/
constructor(
private readonly _db: any,
private readonly _collection: MongoCollection,
) {}
/**
* Attempts to connect to the database
*
* connectError event will be emitted on failure.
*
* @return any errors that occurred
*/
init(): Promise<null>
{
var dao = this;
return new Promise( ( resolve, reject ) =>
{
// attempt to connect to the database
this._db.open( function( err: any, db: any )
{
// if there was an error, don't bother with anything else
if ( err )
{
// in some circumstances, it may just be telling us that
// we're already connected (even though the connection may
// have been broken)
if ( err.errno !== undefined )
{
reject( 'Error opening mongo connection: ' + err );
return;
}
} else if ( db == null )
{
reject( 'No database connection' );
return;
}
// quotes collection
db.collection(
dao.COLLECTION,
function(
_err: any,
collection: MongoCollection,
) {
// for some reason this gets called more than once
if ( collection == null )
{
return;
}
// initialize indexes
collection.createIndex(
[ ['id', 1] ],
true,
function( err: any, _index: { [P: string]: any } )
{
if ( err )
{
reject( 'Error creating index: ' + err );
return;
}
// mark the DAO as ready to be used
dao._collection = collection;
resolve();
return;
}
);
}
);
} );
} );
}
/**
* Get documents in need of processing
*
@ -132,34 +57,37 @@ export class MongoDeltaDao implements DeltaDao
*/
getUnprocessedDocuments(): Promise<Record<string, any>[]>
{
var self = this;
return new Promise( ( resolve, reject ) =>
{
if ( !self._collection )
{
reject( 'Database not ready' );
return;
}
this._collection!.find(
this._collection.find(
{ published: false },
{},
function( _err, cursor )
( e, cursor ) =>
{
cursor.toArray( function( _err: NullableError, data: any[] )
if ( e )
{
// was the quote found?
if ( data.length == 0 )
reject(
new MongoError(
'Error fetching unprocessed documents: ' + e
)
);
return
}
cursor.toArray( ( e: Error, data: any[] ) =>
{
if ( e )
{
resolve( [] );
reject(
new MongoError(
'Error fetching array from cursor: ' + e
)
);
return;
}
// return the quote data
resolve( data );
});
} );
}
)
} );
@ -185,20 +113,27 @@ export class MongoDeltaDao implements DeltaDao
inc_data[ 'totalPublishDelta.' + type ] = 1;
this._collection!.update(
this._collection.update(
{ id: doc_id },
{ $inc: inc_data },
{ upsert: false },
function( err )
e =>
{
if ( err )
if ( e )
{
reject( 'Error advancing delta index: ' + err )
reject( context(
new MongoError(
'Error advancing delta index: ' + e
),
{
doc_id: doc_id,
type: type,
}
) );
return;
}
resolve();
return;
}
);
} );
@ -206,7 +141,9 @@ export class MongoDeltaDao implements DeltaDao
/**
* Mark a given document as processed. First does a check to make sure that
* Mark a given document as processed.
*
* First does a check to make sure that
* the document does not have a newer update timestamp than the provided one
*
* @param doc_id - The document to mark
@ -221,15 +158,23 @@ export class MongoDeltaDao implements DeltaDao
{
return new Promise( ( resolve, reject ) =>
{
this._collection!.update(
this._collection.update(
{ id: doc_id, lastUpdate: { $lte: last_update_ts } },
{ $set: { published: true } },
{ upsert: false },
function( err )
e =>
{
if ( err )
if ( e )
{
reject( "Error marking document as processed: " + err );
reject( context(
new MongoError(
'Error marking document as processed: ' + e
),
{
doc_id: doc_id,
last_update_ts: last_update_ts,
}
) );
return;
}
@ -252,15 +197,22 @@ export class MongoDeltaDao implements DeltaDao
{
return new Promise( ( resolve, reject ) =>
{
this._collection!.update(
this._collection.update(
{ id: doc_id },
{ $set: { deltaError: true } },
{ upsert: false },
function( err )
e =>
{
if ( err )
if ( e )
{
reject( "Failed setting error flag: " + err );
reject( context(
new MongoError(
'Failed setting error flag: ' + e
),
{
doc_id: doc_id,
}
) );
return;
}
@ -281,26 +233,36 @@ export class MongoDeltaDao implements DeltaDao
{
return new Promise( ( resolve, reject ) =>
{
this._collection!.find(
this._collection.find(
{ deltaError: true },
{},
function( err, cursor )
( e, cursor ) =>
{
if ( err )
if ( e )
{
reject( err );
reject(
new Error(
'Failed getting error count: ' + e
)
);
return;
}
cursor.toArray( function( err: NullableError, data: any[] )
cursor.toArray( ( e: NullableError, data: any[] ) =>
{
if ( err )
if ( e )
{
reject( err );
reject( context(
new MongoError(
'Failed getting error count: ' + e
),
{
cursor: cursor,
}
) );
return;
}
// return the count
resolve( data.length );
});
}

View File

@ -0,0 +1,27 @@
/**
* Mongodb error
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* This still uses ease.js because it does a good job of transparently
* creating Error subtypes.
*/
const { Class } = require( 'easejs' );
export const MongoError = Class( 'MongoError' ).extend( Error, {} );

View File

@ -37,6 +37,9 @@ export interface MongoDbConfig extends Record<string, any> {
/** High availability */
ha: boolean;
/** The mongodb collection to read from */
collection: string;
}
@ -62,9 +65,7 @@ export interface MongoDb
}
/**
* Node-style callback for queries
*/
/** Node-style callback for queries */
type MongoCallback = ( err: NullableError, data: { [P: string]: any } ) => void;