1
0
Fork 0

Merge branch 'jira-5312' into 'master'

[DEV-5312] Process to publish quote modifications to new exchange

See merge request floss/liza!68
master
Schaffer, Austin 2019-12-12 15:21:04 -05:00
commit 6c38f7d164
61 changed files with 6209 additions and 478 deletions

17
.env 100644
View File

@ -0,0 +1,17 @@
AMQP_HOST=localhost
AMQP_PORT=5672
AMQP_USER=
AMQP_PASS=
AMQP_FRAMEMAX=0
AMQP_HEARTBEAT=2
AMQP_VHOST=
AMQP_EXCHANGE=
AMQP_RETRIES=30
AMQP_RETRY_WAIT=1
PROM_HOST=
PROM_PORT=9091
PROM_PUSH_INTERVAL_MS=5000
PROM_BUCKETS_START=0
PROM_BUCKETS_WIDTH=10
PROM_BUCKETS_COUNT=10
PROCESS_INTERVAL_MS=2000

4
.gitignore vendored
View File

@ -8,6 +8,7 @@ Makefile.in
# generated by configure
bin/server
bin/delta-processor
src/version.js
/config.*
Makefile
@ -27,6 +28,7 @@ src/**/index.js
# npm
node_modules
# typescript
# typescript output
bin/*.js
tsconfig.tsbuildinfo

View File

@ -0,0 +1,33 @@
#!/bin/sh
# Start Liza delta processor using Node.js executable determined at
# configure-time
#
# Copyright (C) 2010-2019 R-T Specialty, LLC.
#
# This file is part of liza.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# In addition to the configure-time NODE_FLAGS, the NODE_FLAGS environment
# variable can be used to add additional arguments to this script.
# WARNING: NODE_FLAGS arguments provided via environment varialbes are _not_
# escaped, so be mindful of word expansion!
#
# @AUTOGENERATED@
##
cd "$( dirname $( readlink -f "$0" ) )"
exec "@NODE@" @NODE_FLAGS@ $NODE_FLAGS delta-processor.js "$@"

View File

@ -0,0 +1,175 @@
/**
* Start the Liza delta processor
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import * as amqplib from 'amqplib';
import { createAmqpConfig } from '../src/system/AmqpPublisher';
import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
import { DeltaProcessor } from '../src/system/DeltaProcessor';
import { DeltaPublisher } from '../src/system/DeltaPublisher';
import { MongoCollection } from '../src/types/mongodb';
import { createAvroEncoder } from '../src/system/avro/AvroFactory';
import { V1MessageWriter } from '../src/system/avro/V1MessageWriter';
import {
createMongoConfig,
createMongoDB,
getMongoCollection,
} from '../src/system/db/MongoFactory';
import { EventMediator } from '../src/system/EventMediator';
import { EventEmitter } from 'events';
import { StandardLogger } from '../src/system/StandardLogger';
import { MetricsCollector } from '../src/system/MetricsCollector';
import {
PrometheusFactory,
createPrometheusConfig,
} from '../src/system/PrometheusFactory';
import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
import { parse as avro_parse } from 'avro-js';
require('dotenv-flow').config();
const amqp_conf = createAmqpConfig( process.env );
const prom_conf = createPrometheusConfig( process.env );
const db_conf = createMongoConfig( process.env );
const db = createMongoDB( db_conf );
const process_interval_ms = +( process.env.process_interval_ms || 2000 );
const env = process.env.NODE_ENV || 'Unknown Environment';
const emitter = new EventEmitter();
const log = new StandardLogger( console, ts_ctr, env );
const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter );
const message_writer = new V1MessageWriter(
createAvroEncoder,
avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
);
const publisher = new DeltaPublisher(
emitter,
ts_ctr,
amqp_connection,
message_writer,
);
// Prometheus Metrics
const prom_factory = new PrometheusFactory();
const metrics = new MetricsCollector(
prom_factory,
prom_conf,
emitter,
process.hrtime,
);
// Structured logging
new EventMediator( log, emitter );
let process_interval: NodeJS.Timer;
let dao: MongoDeltaDao;
getMongoCollection( db, db_conf )
.then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } )
.then( ( mongoDao: MongoDeltaDao ) => { dao = mongoDao; } )
.then( _ => amqp_connection.connect() )
.then( _ =>
{
log.info( 'Liza Delta Processor' );
handleShutdown();
const processor = new DeltaProcessor( dao, publisher, emitter );
return new Promise( ( _resolve, reject ) =>
{
process_interval = setInterval( () =>
{
try
{
processor.process()
.catch( err => reject( err ) );
}
catch ( err )
{
reject( err );
}
dao.getErrorCount()
.then( count => { metrics.updateErrorCount( count ) } );
}, process_interval_ms );
} );
} )
.catch( e =>
{
emitter.emit( 'error', e );
process.exit( 1 );
} );
/**
* Hook shutdown events
*/
function handleShutdown(): void
{
process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } )
.on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } );
}
/**
* Perform a graceful shutdown
*
* @param signal - the signal that caused the shutdown
*/
function shutdown( signal: string ): void
{
log.info( 'Received ' + signal + '. Beginning graceful shutdown:' );
log.info( '...Stopping processing interval' );
clearInterval( process_interval );
log.info( '...Closing MongoDb connection' );
db.close( ( err, _data ) =>
{
if ( err )
{
console.error( ' Error closing connection: ' + err );
}
} );
log.info( '...Closing AMQP connection...' );
amqp_connection.close();
log.info( '...Stopping the metrics collector...' );
metrics.stop();
log.info( 'Shutdown complete. Exiting.' );
process.exit();
}
/** Timestamp constructor
*
* @return a timestamp
*/
function ts_ctr(): UnixTimestamp
{
return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 );
}

View File

@ -1,7 +1,7 @@
/**
* Start the Liza Server
*
* Copyright (C) 2017 R-T Specialty, LLC.
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
@ -19,19 +19,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
import fs = require( 'fs' );
import path = require( 'path' );
const fs = require( 'fs' );
const path = require( 'path' );
const {
conf: {
ConfLoader,
ConfStore,
},
server,
version,
} = require( '../' );
import { ConfLoader } from "../src/conf/ConfLoader";
import { ConfStore } from "../src/conf/ConfStore";
import * as version from "../src/version";
// kluge for now
const conf_path = (
@ -42,7 +35,7 @@ const conf_path = (
const conf_dir = path.dirname( conf_path );
ConfLoader( fs, ConfStore )
new ConfLoader( fs, ConfStore )
.fromFile( conf_path )
.then( conf => Promise.all( [
conf.get( 'name' ),
@ -70,12 +63,12 @@ ConfLoader( fs, ConfStore )
* Produce an absolute path if `path` is absolute, otherwise a path relative
* to the configuration directory
*
* @param {string} conf_path configuration path (for relative `path`)
* @param {string} path path to resolve
* @param conf_path - configuration path (for relative `path`)
* @param path - path to resolve
*
* @return resolved path
*/
function _resolvePath( conf_path, path )
function _resolvePath( conf_path: string, path: string ): string
{
return ( path[ 0 ] === '/' )
? path
@ -83,15 +76,29 @@ function _resolvePath( conf_path, path )
}
function writePidFile( pid_path )
/**
* Write process id (PID) file
*
* @param pid_path - path to pid file
*/
function writePidFile( pid_path: string ): void
{
fs.writeFile( pid_path, process.pid );
fs.writeFileSync( pid_path, process.pid );
process.on( 'exit', () => fs.unlink( pid_path ) );
process.on( 'exit', () => fs.unlink( pid_path, () => {} ) );
}
function greet( name, pid_path )
/**
* Output greeting
*
* The greeting contains the program name, version, configuration path,
* and PID file path.
*
* @param name - program name
* @param pid_path - path to PID file
*/
function greet( name: string, pid_path: string ): void
{
console.log( `${name} (liza-${version})`);
console.log( `Server configuration: ${conf_path}` );

View File

@ -62,6 +62,7 @@
"vhost": "/",
"queueName": "postrate"
}
},
"c1export": {
"host": "localhost",

View File

@ -1,6 +1,6 @@
## For use my automake and autoconf
#
# Copyright (C) 2014--2017 R-T Specialty, LLC.
# Copyright (C) 2010-2019 R-T Specialty, LLC.
#
# This file is part of liza.
#
@ -88,6 +88,8 @@ AC_CONFIG_FILES([Makefile package.json
src/version.js])
AC_CONFIG_FILES([bin/server],
[chmod +x bin/server])
AC_CONFIG_FILES([bin/delta-processor],
[chmod +x bin/delta-processor])
AC_OUTPUT

View File

@ -16,7 +16,8 @@
},
"bin": {
"liza-server": "bin/server"
"liza-server": "bin/server",
"delta-processor": "bin/delta-processor"
},
"scripts": {
@ -24,13 +25,14 @@
},
"dependencies": {
"easejs": "0.2.x",
"mongodb": "1.2.14",
"amqplib": "0.5.3"
"easejs": "0.2.x",
"mongodb": "1.2.14",
"dotenv-flow": "3.1.0",
"amqplib": "0.5.3"
},
"devDependencies": {
"typescript": "~3.7",
"@types/node": "@TS_NODE_VERSION@",
"@types/node": "12.12.11",
"chai": ">=1.9.1 < 4",
"@types/chai": ">=1.9.1 < 4",
"chai-as-promised": "7.1.0",
@ -38,7 +40,10 @@
"mocha": "5.2.0",
"@types/mocha": "5.2.0",
"sinon": ">=1.17.4",
"es6-promise": "~3"
"es6-promise": "~3",
"@types/amqplib": "0.5.13",
"avro-js": "1.9.1",
"prom-client": "11.0.0"
},
"licenses": [

View File

@ -18,14 +18,21 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { DocumentId } from '../document/Document';
/** The data structure expected for a document's internal key/value store */
export type Kv<T = any> = Record<string, T[]>;
/** Possible delta values for Kv array indexes */
export type DeltaDatum<T> = T | null | undefined;
/** Possible delta types */
export type DeltaType = 'ratedata' | 'data';
/**
* The constructor type for a delta generating function
*
@ -44,7 +51,63 @@ export type DeltaConstructor<T = any, U extends Kv<T> = Kv<T>, V extends Kv<T> =
export type DeltaResult<T> = { [K in keyof T]: DeltaDatum<T[K]> | null };
/**
/** Complete delta type */
export type Delta<T> = {
type: DeltaType,
timestamp: UnixTimestamp,
data: DeltaResult<T>,
}
/** Reverse delta type */
export type ReverseDelta<T> = {
data: Delta<T>[],
ratedata: Delta<T>[],
}
/** Structure for Published delta count */
export type PublishDeltaCount = {
data?: number,
ratedata?: number,
}
/**
* Document structure
*/
export interface DeltaDocument
{
/** The document id */
id: DocumentId,
/** The entity name */
agentName: string,
/** The entity id */
agentEntityId: number,
/** The time the document was created */
startDate: UnixTimestamp,
/** The time the document was updated */
lastUpdate: UnixTimestamp,
/** The data bucket */
data: Record<string, any>,
/** The rate data bucket */
ratedata?: Record<string, any>,
/** The calculated reverse deltas */
rdelta?: ReverseDelta<any>,
/** A count of how many of each delta type have been processed */
totalPublishDelta?: PublishDeltaCount,
};
/**
* Create delta to transform from src into dest
*
* @param src - the source data set
@ -97,13 +160,115 @@ export function createDelta<T, U extends Kv<T>, V extends Kv<T>>(
}
/**
* Apply a delta to a bucket
*
* @param bucket - The bucket data
* @param delta - The delta to apply
*
* @return the bucket with the delta applied
*/
export function applyDelta<T, U extends Kv<T>, V extends Kv<T>>(
bucket: U = <U>{},
delta: DeltaResult<U & V>,
): U
{
const appliedDelta: DeltaResult<any> = {};
if( !delta )
{
return bucket;
}
// Loop through all keys
const key_set = new Set(
Object.keys( bucket ).concat( Object.keys( delta ) ) );
key_set.forEach( key =>
{
const bucket_data = bucket[ key ];
const delta_data = delta[ key ];
// If bucket does not contain the key, use entire delta data
if ( !bucket_data || !bucket_data.length )
{
appliedDelta[ key ] = delta_data;
return;
}
// If delta does not contain the key then retain bucket data
if ( delta_data === null )
{
return;
}
// If delta does not contain the key then retain bucket data
if ( delta_data === undefined )
{
appliedDelta[ key ] = bucket_data;
return;
}
// If neither condition above is true then create the key iteratively
appliedDelta[ key ] = _applyDeltaKey( bucket_data, delta_data );
} );
return <U>appliedDelta;
}
/**
* Apply the delta key iteratively
*
* @param bucket - The bucket data array
* @param delta - The delta data array
*
* @return the applied delta
*/
function _applyDeltaKey<T>(
bucket: T[],
delta: T[],
): DeltaDatum<T>[]
{
const data = [];
const max_size = Math.max( delta.length, bucket.length );
for ( let i = 0; i < max_size; i++ )
{
const delta_datum = delta[ i ];
const bucket_datum = bucket[ i ];
if ( delta_datum === null )
{
break;
}
else if ( delta_datum === undefined )
{
data[ i ] = bucket_datum;
}
else if ( _deepEqual( delta_datum, bucket_datum ) )
{
data[ i ] = bucket_datum;
}
else
{
data[ i ] = delta_datum;
}
}
return data;
}
/**
* Build the delta key iteratively
*
* @param src - the source data array
* @param dest - the destination data array
*
* @return an object with an identical flag and a data array
* @return an object with an changed flag and a data array
*/
function _createDeltaKey<T>(
src: T[],

View File

@ -19,9 +19,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
const { Class } = require( 'easejs' );
import { readFile } from "fs";
import { Store } from "../store/Store";
/**
@ -35,21 +34,8 @@ const { Class } = require( 'easejs' );
* TODO: Merging multiple configuration files would be convenient for
* modular configuration.
*/
module.exports = Class( 'ConfLoader',
export class ConfLoader
{
/**
* Filesystem module
* @type {fs}
*/
'private _fs': null,
/**
* Store object constructor
* @type {function():Store}
*/
'private _storeCtor': null,
/**
* Initialize with provided filesystem module and Store constructor
*
@ -57,14 +43,13 @@ module.exports = Class( 'ConfLoader',
* Node.js'. The Store constructor `store_ctor` is used to instantiate
* new stores to be populated with configuration data.
*
* @param {fs} fs filesystem module
* @param {function():Store} store_ctor Store object constructor
* @param fs - filesystem module
* @param store_ctor - Store object constructor
*/
constructor( fs, store_ctor )
{
this._fs = fs;
this._storeCtor = store_ctor;
},
constructor(
private _fs: { readFile: typeof readFile },
private _storeCtor: () => Store,
) {}
/**
@ -72,11 +57,11 @@ module.exports = Class( 'ConfLoader',
*
* A Store will be produced, populated with the configuration data.
*
* @param {string} filename path to configuration JSON
* @param filename - path to configuration JSON
*
* @return {Promise.<Store>} a promise of a populated Store
* @return a promise of a populated Store
*/
'public fromFile'( filename )
fromFile( filename: string ): Promise<Store>
{
return new Promise( ( resolve, reject ) =>
{
@ -104,7 +89,7 @@ module.exports = Class( 'ConfLoader',
}
} );
} );
},
}
/**
@ -112,12 +97,12 @@ module.exports = Class( 'ConfLoader',
*
* Parses configuration string as JSON.
*
* @param {string} data raw configuration data
* @param data raw configuration data
*
* @return {Promise.<Object>} `data` parsed as JSON
* @return `data` parsed as JSON
*/
'virtual protected parseConfData'( data )
protected parseConfData( data: string ): Promise<any>
{
return Promise.resolve( JSON.parse( data ) );
},
} );
}
}

32
src/conf/ConfStore.d.ts vendored 100644
View File

@ -0,0 +1,32 @@
/**
* Ideal Store for system configuration
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { Store } from "../store/Store";
/**
* A store that recursively instantiates itself
*
* This store is ideal for nested configurations, and handles cases where
* configuration might be asynchronously retrieved. Nested values may be
* retrieved by delimiting the key with `.` (e.g. `foo.bar.baz`); see
* trait `DelimitedKey` for more information and examples.
*/
export declare function ConfStore(): Store;

View File

@ -36,7 +36,7 @@ const {
* retrieved by delimiting the key with `.` (e.g. `foo.bar.baz`); see
* trait `DelimitedKey` for more information and examples.
*/
module.exports = function ConfStore()
exports.ConfStore = function ConfStore()
{
return MemoryStore
.use( AutoObjectStore( ConfStore ) )

View File

@ -18,7 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* The term "Quote" is synonymous with "Document"; this project is moving
* The term 'Quote' is synonymous with 'Document'; this project is moving
* more toward the latter as it is further generalized.
*/
@ -31,7 +31,29 @@ export type DocumentId = NominalType<number, 'DocumentId'>;
/**
* Quote (Document) id
*
* Where the term "Quote" is still used, this will allow for type
* Where the term 'Quote' is still used, this will allow for type
* compatibility and an easy transition.
*/
export type QuoteId = DocumentId;
/**
* Document meta data
*/
export type DocumentMeta =
{
/** The document id */
id: DocumentId,
/** The entity name */
entity_name: string,
/** The entity id */
entity_id: number,
/** The time the document was created */
startDate: UnixTimestamp,
/** The time the document was updated */
lastUpdate: UnixTimestamp,
}

View File

@ -0,0 +1,27 @@
/**
* Amqp error
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* This still uses ease.js because it does a good job of transparently
* creating Error subtypes.
*/
const { Class } = require( 'easejs' );
export const AmqpError = Class( 'AmqpError' ).extend( Error, {} );

View File

@ -0,0 +1,27 @@
/**
* Dao error
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* This still uses ease.js because it does a good job of transparently
* creating Error subtypes.
*/
const { Class } = require( 'easejs' );
export const DaoError = Class( 'DaoError' ).extend( Error, {} );

View File

@ -24,6 +24,7 @@
import { Program } from "../program/Program";
import { Quote, QuoteId } from "./Quote";
import { QuoteDataBucket } from "../bucket/QuoteDataBucket";
import { PositiveInteger } from "../numeric";
export declare class BaseQuote implements Quote
@ -98,5 +99,55 @@ export declare class BaseQuote implements Quote
*
* @return the data bucket
*/
getBucket(): QuoteDataBucket
getBucket(): QuoteDataBucket;
/**
* Retrieves the reason for an explicit lock
*
* @return lock reason
*/
getExplicitLockReason(): string;
/**
* Returns the maximum step to which the explicit lock applies
*
* If no step restriction is set, then 0 will be returned.
*
* @return {number} locked max step or 0 if not applicable
*/
getExplicitLockStep(): PositiveInteger;
/**
* Returns whether the quote has been imported
*
* @return true if imported, otherwise false
*/
isImported(): boolean;
/**
* Returns whether the quote has been bound
*
* @return true if bound, otherwise false
*/
isBound(): boolean;
/**
* Returns the id of the highest step the quote has reached
*
* @return top visited step id
*/
getTopVisitedStepId(): PositiveInteger;
/**
* Returns the id of the highest step the quote has saved
*
* @return top saved step id
*/
getTopSavedStepId(): PositiveInteger;
}

View File

@ -69,7 +69,7 @@ const {
DocumentServer,
db: {
MongoServerDao,
MongoServerDao: { MongoServerDao },
},
lock: {
@ -126,8 +126,8 @@ exports.post_rate_publish = {};
exports.init = function( logger, enc_service, conf )
{
var db = _createDB( logger );
const dao = MongoServerDao( db );
var db = _createDB( logger );
const dao = new MongoServerDao( db );
db.collection( 'quotes', function( err, collection )
{

View File

@ -19,83 +19,56 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
var Class = require( 'easejs' ).Class,
EventEmitter = require( '../../events' ).EventEmitter,
ServerDao = require( './ServerDao' ).ServerDao;
import { ServerDao, Callback } from "./ServerDao";
import { MongoCollection, MongoUpdate, MongoDb } from "mongodb";
import { PositiveInteger } from "../../numeric";
import { ServerSideQuote } from "../quote/ServerSideQuote";
import { QuoteId } from "../../document/Document";
import { WorksheetData } from "../rater/Rater";
const EventEmitter = require( 'events' ).EventEmitter;
type ErrorCallback = ( err: NullableError ) => void;
/**
* Uses MongoDB as a data store
*/
module.exports = Class( 'MongoServerDao' )
.implement( ServerDao )
.extend( EventEmitter,
export class MongoServerDao extends EventEmitter implements ServerDao
{
/**
* Collection used to store quotes
* @type String
*/
'const COLLECTION': 'quotes',
/** Collection used to store quotes */
readonly COLLECTION: string = 'quotes';
/**
* Sequence (auto-increment) collection
* @type {string}
*/
'const COLLECTION_SEQ': 'seq',
/** Sequence (auto-increment) collection */
readonly COLLECTION_SEQ: string = 'seq';
/**
* Sequence key for quote ids
*
* @type {string}
* @const
*/
'const SEQ_QUOTE_ID': 'quoteId',
/** Sequence key for quote ids */
readonly SEQ_QUOTE_ID: string = 'quoteId';
/**
* Sequence quoteId default
*
* @type {number}
* @const
*/
'const SEQ_QUOTE_ID_DEFAULT': 200000,
/** Sequence quoteId default */
readonly SEQ_QUOTE_ID_DEFAULT: number = 200000;
/**
* Database instance
* @type Mongo.Db
*/
'private _db': null,
/** Whether the DAO is initialized and ready to be used */
private _ready: boolean = false;
/**
* Whether the DAO is initialized and ready to be used
* @type Boolean
*/
'private _ready': false,
/** Collection to save data to */
private _collection?: MongoCollection | null;
/**
* Collection to save data to
* @type null|Collection
*/
'private _collection': null,
/**
* Collection to read sequences (auto-increments) from
* @type {null|Collection}
*/
'private _seqCollection': null,
/** Collection to read sequences (auto-increments) from */
private _seqCollection?: MongoCollection | null;
/**
* Initializes DAO
*
* @param {Mongo.Db} db mongo database connection
*
* @return undefined
*/
'public __construct': function( db )
constructor(
private readonly _db: MongoDb
)
{
this._db = db;
},
super();
}
/**
@ -108,12 +81,12 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self to allow for method chaining
*/
'public init': function( callback )
init( callback: () => void ): this
{
var dao = this;
// map db error event (on connection error) to our connectError event
this._db.on( 'error', function( err )
this._db.on( 'error', function( err: Error )
{
dao._ready = false;
dao._collection = null;
@ -123,7 +96,7 @@ module.exports = Class( 'MongoServerDao' )
this.connect( callback );
return this;
},
}
/**
@ -136,12 +109,12 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self to allow for method chaining
*/
'public connect': function( callback )
connect( callback: () => void ): this
{
var dao = this;
// attempt to connect to the database
this._db.open( function( err, db )
this._db.open( function( err: any, db: any )
{
// if there was an error, don't bother with anything else
if ( err )
@ -176,84 +149,97 @@ module.exports = Class( 'MongoServerDao' )
}
// quotes collection
db.collection( dao.__self.$('COLLECTION'), function( err, collection )
{
// for some reason this gets called more than once
if ( collection == null )
{
return;
}
// initialize indexes
collection.createIndex(
[ ['id', 1] ],
true,
function( err, index )
db.collection(
dao.COLLECTION,
function(
_err: any,
collection: MongoCollection,
) {
// for some reason this gets called more than once
if ( collection == null )
{
// mark the DAO as ready to be used
dao._collection = collection;
check_ready();
return;
}
);
});
// initialize indexes
collection.createIndex(
[ ['id', 1] ],
true,
function(
_err: NullableError,
_index: { [P: string]: any,
} )
{
// mark the DAO as ready to be used
dao._collection = collection;
check_ready();
}
);
}
);
// seq collection
db.collection( dao.__self.$('COLLECTION_SEQ'), function( err, collection )
{
if ( err )
{
dao.emit( 'seqError', err );
return;
}
if ( collection == null )
{
return;
}
dao._seqCollection = collection;
// has the sequence we'll be referencing been initialized?
collection.find(
{ _id: dao.__self.$('SEQ_QUOTE_ID') },
{ limit: 1 },
function( err, cursor )
db.collection(
dao.COLLECTION_SEQ,
function(
err: Error,
collection: MongoCollection,
) {
if ( err )
{
if ( err )
{
dao.initQuoteIdSeq( check_ready )
return;
}
dao.emit( 'seqError', err );
return;
}
cursor.toArray( function( err, data )
if ( collection == null )
{
return;
}
dao._seqCollection = collection;
// has the sequence we'll be referencing been initialized?
collection.find(
{ _id: dao.SEQ_QUOTE_ID },
{ limit: <PositiveInteger>1 },
function( err: NullableError, cursor )
{
if ( data.length == 0 )
if ( err )
{
dao.initQuoteIdSeq( check_ready );
dao._initQuoteIdSeq( check_ready )
return;
}
check_ready();
});
}
);
});
cursor.toArray( function( _err: Error, data: any[] )
{
if ( data.length == 0 )
{
dao._initQuoteIdSeq( check_ready );
return;
}
check_ready();
});
}
);
}
);
});
return this;
},
}
'public initQuoteIdSeq': function( callback )
private _initQuoteIdSeq( callback: () => void )
{
var dao = this;
this._seqCollection.insert(
this._seqCollection!.insert(
{
_id: this.__self.$('SEQ_QUOTE_ID'),
val: this.__self.$('SEQ_QUOTE_ID_DEFAULT'),
_id: this.SEQ_QUOTE_ID,
val: this.SEQ_QUOTE_ID_DEFAULT,
},
function( err, docs )
function( err: NullableError, _docs: any )
{
if ( err )
{
@ -261,11 +247,11 @@ module.exports = Class( 'MongoServerDao' )
return;
}
dao.emit( 'seqInit', this.__self.$('SEQ_QUOTE_ID') );
callback.call( this );
dao.emit( 'seqInit', dao.SEQ_QUOTE_ID );
callback.call( dao );
}
);
},
}
/**
@ -281,15 +267,17 @@ module.exports = Class( 'MongoServerDao' )
* @param Function failure_callback function to call if save fails
* @param Object save_data quote data to save (optional)
* @param Object push_data quote data to push (optional)
*
* @return MongoServerDao self to allow for method chaining
*/
'public saveQuote': function(
quote, success_callback, failure_callback, save_data, push_data
)
saveQuote(
quote: ServerSideQuote,
success_callback: Callback,
failure_callback: Callback,
save_data?: any,
push_data?: any,
): this
{
var dao = this;
var meta = {};
var dao = this;
var meta: Record<string, any> = {};
// if we're not ready, then we can't save the quote!
if ( this._ready === false )
@ -301,7 +289,7 @@ module.exports = Class( 'MongoServerDao' )
);
failure_callback.call( this, quote );
return;
return dao;
}
if ( save_data === undefined )
@ -321,6 +309,7 @@ module.exports = Class( 'MongoServerDao' )
save_data.id = id;
save_data.pver = quote.getProgramVersion();
save_data.importDirty = 1;
save_data.published = false;
save_data.lastPremDate = quote.getLastPremiumDate();
save_data.initialRatedDate = quote.getRatedDate();
save_data.explicitLock = quote.getExplicitLockReason();
@ -349,14 +338,14 @@ module.exports = Class( 'MongoServerDao' )
// update the quote data if it already exists (same id), otherwise
// insert it
this._collection.update( { id: id },
this._collection!.update( { id: id },
document,
// create record if it does not yet exist
{ upsert: true },
// on complete
function( err, docs )
function( err, _docs )
{
// if an error occurred, then we cannot continue
if ( err )
@ -381,7 +370,7 @@ module.exports = Class( 'MongoServerDao' )
);
return this;
},
}
/**
@ -391,21 +380,24 @@ module.exports = Class( 'MongoServerDao' )
* @param {Object} data quote data
* @param {Function} scallback successful callback
* @param {Function} fcallback failure callback
*
* @return {MongoServerDao} self
*/
'public mergeData': function( quote, data, scallback, fcallback )
mergeData(
quote: ServerSideQuote,
data: MongoUpdate,
scallback: Callback,
fcallback: Callback,
): this
{
// we do not want to alter the original data; use it as a prototype
var update = data;
// save the stack so we can track this call via the oplog
var _self = this;
this._collection.update( { id: quote.getId() },
this._collection!.update( { id: quote.getId() },
{ '$set': update },
{},
function( err, docs )
function( err, _docs )
{
if ( err )
{
@ -427,7 +419,7 @@ module.exports = Class( 'MongoServerDao' )
);
return this;
},
}
/**
@ -441,9 +433,14 @@ module.exports = Class( 'MongoServerDao' )
*
* @return {MongoServerDao} self
*/
'public mergeBucket': function( quote, data, scallback, fcallback )
mergeBucket(
quote: ServerSideQuote,
data: MongoUpdate,
success: Callback,
failure: Callback,
): this
{
var update = {};
var update: MongoUpdate = {};
for ( var field in data )
{
@ -455,8 +452,8 @@ module.exports = Class( 'MongoServerDao' )
update[ 'data.' + field ] = data[ field ];
}
return this.mergeData( quote, update, scallback, fcallback );
},
return this.mergeData( quote, update, success, failure );
}
/**
@ -471,8 +468,10 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self
*/
'public saveQuoteState': function(
quote, success_callback, failure_callback
saveQuoteState(
quote: ServerSideQuote,
success_callback: Callback,
failure_callback: Callback,
)
{
var update = {
@ -484,10 +483,15 @@ module.exports = Class( 'MongoServerDao' )
return this.mergeData(
quote, update, success_callback, failure_callback
);
},
}