diff --git a/.env b/.env index 983d285..6f8e8c3 100644 --- a/.env +++ b/.env @@ -1,6 +1,6 @@ hostname=localhost port=5672 -username= +username=quote_referral password= vhost=dev exchange=quoteupdate diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 535bb51..836cb83 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -19,4 +19,150 @@ * along with this program. If not, see . */ -console.log( 'Nothing to see here yet.' ); +import { AmqpConfig } from "../src/system/AmqpPublisher"; +import { MongoDeltaDao } from "../src/system/db/MongoDeltaDao"; +import { DeltaProcessor } from "../src/system/DeltaProcessor"; +import { DeltaPublisher } from "../src/system/DeltaPublisher"; +import { MongoDb, MongoDbConfig } from "../src/types/mongodb"; +import { DeltaLogger } from "../src/system/DeltaLogger"; +import { EventEmitter } from "events"; +import { EventDispatcher } from "../src/system/event/EventDispatcher"; +import { EventSubscriber } from "../src/system/event/EventSubscriber"; +// import { MetricsCollector } from "../src/system/MetricsCollector"; + +const { + Db: MongoDb, + Server: MongoServer, + Connection: MongoConnection, + ReplServers: ReplSetServers, +} = require( 'mongodb/lib/mongodb' ); + +// TODO: fix this +process.env.hostname = 'localhost'; +process.env.port = '5672'; +process.env.username = 'quote_referral'; +process.env.password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E'; +process.env.vhost = 'quote'; +process.env.exchange = 'quoteupdate'; + + +// Environment variables +const amqp_conf = _getAmqpConfig( process.env ); +const db_conf = _getMongoConfig( process.env ); +const env = process.env.NODE_ENV || 'Unknown Environment'; + +// Event handling +const event_emitter = new EventEmitter(); +const event_dispatcher = new EventDispatcher( event_emitter ); +const event_subscriber = new EventSubscriber( event_emitter ); + +// Event subscribers +new DeltaLogger( env, event_subscriber, ts_ctr ).init(); +// new MetricsCollector( env, event_subscriber ); + +// Instantiate classes for processor +const db = _createDB( db_conf ); +const dao = new MongoDeltaDao( db ); +const publisher = new DeltaPublisher( amqp_conf, event_dispatcher, ts_ctr ); +const processor = new DeltaProcessor( dao, publisher, event_dispatcher ); + +// If the dao intializes successfully then process on a two second interval +const interval_ms = 2000; + +dao.init() +.then( _ => { setInterval( () => { processor.process(); }, interval_ms ); } ) +.catch( err => { console.error( 'Mongo Error: ' + err ); } ); + + +/** Timestamp constructor + * + * @return a timestamp + */ +function ts_ctr(): UnixTimestamp +{ + return Math.floor( new Date().getTime() / 1000 ); +} + + +/** + * Create the database connection + * + * @param conf - the configuration from the environment + * + * @return the mongodb connection + */ +function _createDB( conf: MongoDbConfig ): MongoDb +{ + if( conf.ha ) + { + var mongodbPort = conf.port || MongoConnection.DEFAULT_PORT; + var mongodbReplSet = conf.replset || 'rs0'; + var dbServers = new ReplSetServers( + [ + new MongoServer( conf.host_a, conf.port_a || mongodbPort), + new MongoServer( conf.host_b, conf.port_b || mongodbPort), + ], + {rs_name: mongodbReplSet, auto_reconnect: true} + ); + } + else + { + var dbServers = new MongoServer( + conf.host || '127.0.0.1', + conf.port || MongoConnection.DEFAULT_PORT, + {auto_reconnect: true} + ); + } + var db = new MongoDb( + 'program', + dbServers, + {native_parser: false, safe: false} + ); + return db; +} + + +/** + * Create a mongodb configuration from the environment + * + * @param env - the environment variables + * + * @return the mongo configuration + */ +function _getMongoConfig( env: any ): MongoDbConfig +{ + return { + "port": +( env.MONGODB_PORT || 0 ), + "ha": +( env.LIZA_MONGODB_HA || 0 ) == 1, + "replset": env.LIZA_MONGODB_REPLSET, + "host": env.MONGODB_HOST, + "host_a": env.LIZA_MONGODB_HOST_A, + "port_a": +( env.LIZA_MONGODB_PORT_A || 0 ), + "host_b": env.LIZA_MONGODB_HOST_B, + "port_b": +( env.LIZA_MONGODB_PORT_B || 0 ), + }; +} + + +/** + * Create an amqp configuration from the environment + * + * @param env - the environment variables + * + * @return the amqp configuration + */ +function _getAmqpConfig( env: any ): AmqpConfig +{ + return { + "protocol": "amqp", + "hostname": env.hostname, + "port": +( env.port || 0 ), + "username": env.username, + "password": env.password, + "locale": "en_US", + "frameMax": 0, + "heartbeat": 0, + "vhost": env.vhost, + "exchange": env.exchange, + }; +} \ No newline at end of file diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json index e924fb1..523bb7b 100644 --- a/conf/vanilla-server.json +++ b/conf/vanilla-server.json @@ -51,17 +51,18 @@ "domain": "" }, "postRatePublish": { - "protocol": "amqp", - "hostname": "localhost", - "port": 5672, - "username": "", - "password": "", - "locale": "en_US", - "frameMax": 0, - "heartbeat": 0, - "vhost": "/", - "exchange": "postrate" + "protocol": "amqp", + "hostname": "localhost", + "port": 5672, + "username": "", + "password": "", + "locale": "en_US", + "frameMax": 0, + "heartbeat": 0, + "vhost": "/", + "queueName": "postrate" } + }, "c1export": { "host": "localhost", diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index bfd6dc3..019ea0d 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -38,5 +38,5 @@ export interface AmqpPublisher * * @param delta - The delta to publish */ - publish( delta: DeltaResult ): void; + publish( delta: DeltaResult ): Promise; } diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts new file mode 100644 index 0000000..50e616e --- /dev/null +++ b/src/system/DeltaLogger.ts @@ -0,0 +1,135 @@ +/** + * Delta logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Logger for delta events + */ + +import { EventSubscriber } from "./event/EventSubscriber"; + +enum LogLevel { + DEBUG, + INFO, + NOTICE, + WARNING, + ERROR, + CRITICAL, + ALERT, + EMERGENCY, +}; + +declare type StructuredLog = { + message: string; + timestamp: UnixTimestamp; + service: string; + env: string; + severity: string; +} + +export class DeltaLogger +{ + /** + * Initialize delta logger + * + * @param _env - The environment ( dev, test, demo, live ) + * @param _subscriber - An event subscriber + * @param _ts_ctr - a timestamp constructor + */ + constructor( + private readonly _env: string, + private readonly _subscriber: EventSubscriber, + private readonly _ts_ctr : () => UnixTimestamp, + ) {} + + + /** + * Initialize the logger to look for specific events + */ + init(): void + { + this._registerEvent( 'document-processed', LogLevel.NOTICE ); + this._registerEvent( 'delta-publish', LogLevel.NOTICE ); + this._registerEvent( 'avro-parse-err', LogLevel.ERROR ); + this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + this._registerEvent( 'publish-err', LogLevel.ERROR ); + } + + + /** + * Register an event at a specific log level + * + * @param event_id - the event id + * @param level - the log level + */ + private _registerEvent( event_id: string, level: LogLevel ): void + { + const logF = this._getLogLevelFunction( level ) + + this._subscriber.subscribe( event_id, logF ); + } + + + /** + * Get a logging function for the specified log level + * + * @param event_id - the event id + * + * @return the function to log with + */ + private _getLogLevelFunction( level: LogLevel ): ( str: string ) => void + { + switch( level ) + { + case LogLevel.DEBUG: + case LogLevel.INFO: + return ( _ ) => console.info( this._formatLog( _, level ) ); + case LogLevel.NOTICE: + return ( _ ) => console.log( this._formatLog( _, level ) ); + case LogLevel.WARNING: + return ( _ ) => console.warn( this._formatLog( _, level ) ); + case LogLevel.ERROR: + case LogLevel.CRITICAL: + case LogLevel.ALERT: + case LogLevel.EMERGENCY: + return ( _ ) => console.error( this._formatLog( _, level ) ); + default: + return ( _ ) => console.log( "UNKNOWN LOG LEVEL: " + _ ); + } + } + + + /** + * Get structured log object + * + * @param str - the string to log + * @param level - the log level + * + * @returns a structured logging object + */ + private _formatLog( str: string, level: LogLevel ): StructuredLog + { + return { + message: str, + timestamp: this._ts_ctr(), + service: 'quote-server', + env: this._env, + severity: LogLevel[level], + }; + } +} diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 678e700..39cdf3d 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -24,7 +24,7 @@ import { MongoDeltaType } from "../system/db/MongoDeltaDao"; import { DeltaResult } from "../bucket/delta"; import { DocumentId } from "../document/Document"; import { AmqpPublisher } from "./AmqpPublisher"; - +import { EventDispatcher } from "./event/EventDispatcher"; /** * Process deltas for a quote and publish to a queue @@ -41,11 +41,14 @@ export class DeltaProcessor /** * Initialize processor * - * @param _collection Mongo collection + * @param _dao - Mongo collection + * @param _publisher - Amqp Publisher + * @param _dispatcher - Event dispatcher instance */ constructor( - private readonly _dao: DeltaDao, - private readonly _publisher: AmqpPublisher, + private readonly _dao: DeltaDao, + private readonly _publisher: AmqpPublisher, + private readonly _dispatcher: EventDispatcher ) {} @@ -56,31 +59,48 @@ export class DeltaProcessor { let self = this; - this._dao.getUnprocessedDocuments( function( docs ) + this._dao.getUnprocessedDocuments() + .then( docs => { - docs.forEach( doc => { - - const deltas = self.getTimestampSortedDeltas( doc ); - - deltas.forEach( delta => { - - self._publisher.publish( delta ); + docs.forEach( doc => + { + const deltas = self.getTimestampSortedDeltas( doc ); + const doc_id: DocumentId = doc.id; + const last_updated_ts = doc.lastUpdate; + deltas.forEach( delta => + { + self._publisher.publish( delta ) + .then( _ => + { + self._dao.advanceDeltaIndex( doc_id, delta.type ); + } ) + .catch( _ => + { + // TODO: blow up? + } ); }); - const last_updated_ts = doc.lastUpdated; - const doc_id: DocumentId = doc.id; - - self._dao.markDocumentAsProcessed( - doc_id, - last_updated_ts, - function( err, markedSuccessfully ) - { - console.log( err, markedSuccessfully ); - }, - ); + self._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) + .then( _ => + { + this._dispatcher.dispatch( + 'document-processed', + 'Deltas on document ' + doc_id + ' processed ' + + 'successfully. Document has been marked as ' + + 'completely processed.' + ); + } ) + .catch( err => + { + this._dispatcher.dispatch( 'mongodb-err', err ); + } ); }); - }); + } ) + .catch( err => + { + this._dispatcher.dispatch( 'mongodb-err', err ); + } ); } @@ -91,9 +111,7 @@ export class DeltaProcessor * * @return a list of deltas sorted by timestamp */ - getTimestampSortedDeltas( - doc: any, - ): DeltaResult[] + getTimestampSortedDeltas( doc: any ): DeltaResult[] { const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA ); const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA ); @@ -113,32 +131,26 @@ export class DeltaProcessor * * @return a trimmed list of deltas */ - getDeltas( - doc: any, - type: MongoDeltaType, - ): DeltaResult[] + getDeltas( doc: any, type: MongoDeltaType ): DeltaResult[] { - // Get objects so we can get the index by type - const deltas_obj = doc.rdelta || {}; + const deltas_obj = doc.rdelta || {}; + const deltas: DeltaResult[] = deltas_obj[ type ] || []; - // Get type specific deltas + // Get type specific delta index let last_published_index = 0; if ( doc.lastPublishDelta ) { - const last_published_indexes = doc.lastPublishDelta; - - last_published_index = last_published_indexes[ type ] || 0; + last_published_index = doc.lastPublishDelta[ type ] || 0; } - const deltas: DeltaResult[] = deltas_obj[ type ] || []; - // Only return the unprocessed deltas const deltas_trimmed = deltas.slice( last_published_index ); // Mark each delta with its type - deltas_trimmed.forEach( delta => { + deltas_trimmed.forEach( delta => + { delta.type = type; - }); + } ); return deltas_trimmed; } @@ -148,14 +160,11 @@ export class DeltaProcessor * Sort an array of deltas by timestamp * * @param a - The first delta to compare - * @param a - The second delta to compare + * @param b - The second delta to compare * * @return a sort value */ - private _sortByTimestamp( - a: DeltaResult, - b: DeltaResult, - ): number + private _sortByTimestamp( a: DeltaResult, b: DeltaResult ): number { if ( a.timestamp < b.timestamp ) { @@ -168,26 +177,4 @@ export class DeltaProcessor return 0; } - - - /** - * Generate amqp config from environment variables - * - * @returns the amqp configuration - */ - // generateConfigFromEnv(): AmqpConfig - // { - // return { - // "protocol": "amqp", - // "hostname": process.env.hostname, - // "port": process.env.port, - // "username": process.env.username, - // "password": process.env.password, - // "locale": "en_US", - // "frameMax": 0, - // "heartbeat": 0, - // "vhost": process.env.vhost, - // "exchange": process.env.exchange, - // }; - // } } \ No newline at end of file diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index 57a74b6..c67cde4 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -21,8 +21,9 @@ * Publish delta message to a queue */ -import { AmqpPublisher } from "./AmqpPublisher"; -import { DeltaResult } from "../bucket/delta"; +import { AmqpPublisher } from './AmqpPublisher'; +import { DeltaResult } from '../bucket/delta'; +import { EventDispatcher } from './event/EventDispatcher'; import { connect as amqpConnect, Options, @@ -31,7 +32,6 @@ import { const avro = require( 'avro-js' ); - export interface AmqpConfig extends Options.Connect { /** The name of a queue or exchange to publish to */ exchange: string; @@ -41,24 +41,26 @@ export interface AmqpConfig extends Options.Connect { export class DeltaPublisher implements AmqpPublisher { /** The path to the avro schema */ - readonly SCHEMA_PATH = './avro/schema.avsc'; + readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc'; /** A mapping of which delta type translated to which avro event */ readonly DELTA_MAP: Record = { - data: 'rate', - ratedata: 'update', + data: 'STEP_SAVE', + ratedata: 'RATE', }; /** - * Initialize trait + * Initialize publisher * - * @param _conf - amqp configuration - * @param _logger - logger instance + * @param _conf - amqp configuration + * @param _emitter - event emitter instance + * @param _ts_ctr - a timestamp constructor */ constructor( - private readonly _conf: AmqpConfig, - private readonly _logger: any + private readonly _conf: AmqpConfig, + private readonly _dispatcher: EventDispatcher, + private readonly _ts_ctr : () => UnixTimestamp, ) {} @@ -66,35 +68,65 @@ export class DeltaPublisher implements AmqpPublisher * Publish quote message to exchange post-rating * * @param delta - The delta to publish + * + * @return whether the message was published successfully */ - publish( delta: DeltaResult ): void + publish( delta: DeltaResult ): Promise { - // check both as we transition from one to the other const exchange = this._conf.exchange; - amqpConnect( this._conf ) + return new Promise( ( resolve, reject ) => + { + amqpConnect( this._conf ) .then( conn => { setTimeout( () => conn.close(), 10000 ); return conn.createChannel(); } ) - .then( ch => { + .then( ch => + { ch.assertExchange( exchange, 'fanout', { durable: true } ); - return this._sendMessage( ch, exchange, delta ); + return this.sendMessage( ch, exchange, delta ); } ) - .then( () => this._logger.log( - this._logger.PRIORITY_INFO, - "Published " + delta.type + " delta with timestamp '" + - delta.timestamp + "' to quote-update exchange '"+ - exchange + "'" - ) ) - .catch( e => this._logger.log( - this._logger.PRIORITY_ERROR, - "Error publishing " + delta.type + " delta with timestamp '" + - delta.timestamp + "' to quote-update exchange '"+ - exchange + "'" + ": " + e - ) ); + .then( sentSuccessfully => + { + console.log('sentSuccessfully', sentSuccessfully); + if ( sentSuccessfully ) + { + this._dispatcher.dispatch( + 'delta-publish', + "Published " + delta.type + " delta with ts '" + + delta.timestamp + "' to '" + exchange + + '" exchange', + ); + + resolve(); + } + else + { + this._dispatcher.dispatch( + 'publish-err', + "Error publishing " + delta.type + " delta with ts '" + + delta.timestamp + "' to '" + exchange + + "' exchange", + ); + + reject(); + } + } ) + .catch( e => + { + this._dispatcher.dispatch( + 'publish-err', + "Error publishing " + delta.type + " delta with ts '" + + delta.timestamp + '" to "' + exchange + "' exchange '" + + e, + ) + + reject(); + } ); + } ); } @@ -107,7 +139,7 @@ export class DeltaPublisher implements AmqpPublisher * * @return whether publish was successful */ - _sendMessage( + sendMessage( channel: Channel, exchange: string, delta: DeltaResult, @@ -118,14 +150,48 @@ export class DeltaPublisher implements AmqpPublisher created: Date.now(), }; - const event_id = this.DELTA_MAP[ delta.type ]; + // Convert all delta datums to string for avro + const delta_data = this.avroFormat( delta.data ); + const event_id = this.DELTA_MAP[ delta.type ]; const data = { - delta: delta, - event: event_id, + event: { + id: event_id, + ts: this._ts_ctr(), + actor: 'SERVER', + step: null, + }, + document: { + id: 123123, // Fix + }, + session: { + entity_name: 'Foobar', // Fix + entity_id: 123123, // Fix + }, + data: { + Data: { + bucket: delta_data, + }, + }, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', // Fix + }, + }, }; - const avro_buffer = this._avroEncode( data ); + const avro_buffer = this.avroEncode( data ); + + if ( !avro_buffer ) + { + return false; + } // we don't use a routing key; fanout exchange const routing_key = ''; @@ -144,14 +210,91 @@ export class DeltaPublisher implements AmqpPublisher * * @param data - the data to encode * - * @return the avro buffer + * @return the avro buffer or null if there is an error */ - _avroEncode( data: Record ): Buffer + avroEncode( data: Record ): Buffer | null { - const type = avro.parse( this.SCHEMA_PATH ); + let buffer = null; - const buffer = type.toBuffer( data ); + try + { + const type = avro.parse( this.SCHEMA_PATH ); + buffer = type.toBuffer( data ); + } + catch( e ) + { + this._dispatcher.dispatch( + 'avro-parse-err', + 'Error encoding data to avro: ' + e, + ); + } return buffer; } + + + /** + * Format the data for avro by add type specifications to the data + * + * @param data - the data to format + * + * @return the formatted data + */ + avroFormat( data: any, top_level: boolean = true ): any + { + let data_formatted: any = {}; + + switch( typeof( data ) ) + { + case 'object': // Typescript treats arrays as objects + if ( data == null ) + { + return null; + } + else if ( Array.isArray( data ) ) + { + let arr: any[] = []; + + data.forEach( ( datum ) => + { + arr.push( this.avroFormat( datum, false ) ); + } ); + + data_formatted = ( top_level ) + ? arr + : { 'array': arr }; + } + else + { + let datum_formatted: any = {}; + + Object.keys( data).forEach( ( key: string ) => + { + const datum = this.avroFormat( data[ key ], false ); + + datum_formatted[ key ] = datum; + + } ); + + data_formatted = ( top_level ) + ? datum_formatted + : { "map": datum_formatted }; + } + break; + + case 'boolean': + return { 'boolean': data }; + + case 'number': + return { 'double': data }; + + case 'string': + return { 'string': data }; + + case 'undefined': + return null; + } + + return data_formatted; + } } diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts new file mode 100644 index 0000000..583b5a6 --- /dev/null +++ b/src/system/MetricsCollector.ts @@ -0,0 +1,80 @@ +/** + * Metrics Collector + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Collect Metrics for Prometheus + */ + +import { EventSubscriber } from "./event/EventSubscriber"; + +const client = require('prom-client'); + +declare type MetricStructure = { + path: string; + code: number; + service: string; + env: string; +} + +export class MetricsCollector +{ + /** + * Initialize delta logger + */ + constructor( + private readonly _env: string, + private readonly _subscriber: EventSubscriber, + ) {} + + + /** + * Initialize the logger to look for specific events + */ + init(): void + { + const collectDefaultMetrics = client.collectDefaultMetrics; + + console.log( this._subscriber, collectDefaultMetrics) + this._formatLog( '', 123 ); + // this._registerEvent( 'document-processed', LogLevel.NOTICE ); + // this._registerEvent( 'delta-publish', LogLevel.NOTICE ); + // this._registerEvent( 'avro-parse-err', LogLevel.ERROR ); + // this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + // this._registerEvent( 'publish-err', LogLevel.ERROR ); + } + + + /** + * Get structured metric object + * + * @param path - the endpoint being hit + * @param code - the response code + * + * @returns a structured logging object + */ + private _formatLog( path: string, code: number ): MetricStructure + { + return { + path: path, + code: code, + service: 'quote-server', + env: this._env, + }; + } +} diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc index ee793a6..4a9a609 100644 --- a/src/system/avro/schema.avsc +++ b/src/system/avro/schema.avsc @@ -34,28 +34,31 @@ }, { "name": "step", - "type": { - "type": "record", - "name": "EventStep", - "fields": [ - { - "name": "transition", - "type": { - "type": "enum", - "name": "EventStepTransition", - "symbols": [ "BACK", "FORWARD", "END" ] + "type":[ + "null", + { + "type": "record", + "name": "EventStep", + "fields": [ + { + "name": "transition", + "type": { + "type": "enum", + "name": "EventStepTransition", + "symbols": [ "BACK", "FORWARD", "END" ] + } + }, + { + "name": "src", + "type": "string" + }, + { + "name": "dest", + "type": "string" } - }, - { - "name": "src", - "type": "string" - }, - { - "name": "dest", - "type": "string" - } - ] - } + ] + } + ] } ] } @@ -70,20 +73,6 @@ { "name": "id", "type": "int" - }, - { - "name": "created", - "type": "long", - "logicalType": "timestamp-millis" - }, - { - "name": "modified", - "type": "long", - "logicalType": "timestamp-millis" - }, - { - "name": "top_visited_step", - "type": "string" } ] } @@ -115,12 +104,56 @@ "fields": [ { "name": "bucket", - "type": { + "type":{ "type": "map", - "values": { - "type" : "array", - "items" : "string" - } + "values": [ + "null", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + }, + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string", + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + } + ] + } + ] } } ] diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index 53cd8f5..64e1f0f 100644 --- a/src/system/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -28,7 +28,6 @@ */ import { DocumentId } from "../../document/Document"; -import { PositiveInteger } from "../../numeric"; /** Manage deltas */ @@ -39,23 +38,21 @@ export interface DeltaDao * * @return documents in need of processing */ - getUnprocessedDocuments( - callback: ( data: Record[] ) => void, - ): this; + getUnprocessedDocuments(): Promise[]> /** * Set the document's processed index * - * @param doc_id - The document whose index will be set - * @param index - The index to set + * @param doc_id - Document whose index will be set + * @param type - Delta type + * + * @return any errors that occured */ - advanceDeltaIndexByType( + advanceDeltaIndex( doc_id: DocumentId, type: string, - index: PositiveInteger, - callback: ( err: NullableError, indexHasAdvanced: boolean ) => void, - ): this; + ): Promise /** @@ -68,9 +65,8 @@ export interface DeltaDao * @return true if the document was successfully marked as processed */ markDocumentAsProcessed( - doc_id: DocumentId, - last_update_ts: UnixTimestamp, - callback: ( err: NullableError, markedSuccessfully: boolean ) => void, - ): this; + doc_id: DocumentId, + last_update_ts: UnixTimestamp, + ): Promise } diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index cebf453..443ae85 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -22,10 +22,8 @@ */ import { DocumentId } from "../../document/Document"; -import { PositiveInteger } from "../../numeric"; -import { MongoCollection } from "mongodb"; import { DeltaDao } from "./DeltaDao"; - +import { MongoCollection } from "mongodb"; export type MongoDeltaType = 'ratedata' | 'data'; @@ -33,56 +31,134 @@ export type MongoDeltaType = 'ratedata' | 'data'; /** Manage deltas */ export class MongoDeltaDao implements DeltaDao { + /** Collection used to store quotes */ + readonly COLLECTION: string = 'quotes'; + /** The ratedata delta type */ static readonly DELTA_RATEDATA: string = 'ratedata'; /** The data delta type */ static readonly DELTA_DATA: string = 'data'; + /** The mongo quotes collection */ + private _collection?: MongoCollection | null; + /** * Initialize connection * - * @param _collection Mongo collection + * @param _db Mongo db */ constructor( - private readonly _collection: MongoCollection, + private readonly _db: any, ) {} + /** + * Attempts to connect to the database + * + * connectError event will be emitted on failure. + * + * @return any errors that occured + */ + init(): Promise + { + var dao = this; + + return new Promise( ( resolve, reject ) => + { + // attempt to connect to the database + this._db.open( function( err: any, db: any ) + { + // if there was an error, don't bother with anything else + if ( err ) + { + // in some circumstances, it may just be telling us that we're + // already connected (even though the connection may have been + // broken) + if ( err.errno !== undefined ) + { + reject( 'Error opening mongo connection: ' + err ); + return; + } + } + + // quotes collection + db.collection( + dao.COLLECTION, + function( + _err: any, + collection: MongoCollection, + ) { + // for some reason this gets called more than once + if ( collection == null ) + { + return; + } + + // initialize indexes + collection.createIndex( + [ ['id', 1] ], + true, + function( err: any, _index: { [P: string]: any } ) + { + if ( err ) + { + reject( 'Error creating index: ' + err ); + return; + } + + // mark the DAO as ready to be used + dao._collection = collection; + resolve(); + return; + } + ); + } + ); + }); + } ); + } + + /** * Get documents in need of processing * * @return documents in need of processing */ - getUnprocessedDocuments( - callback: ( data: Record[] ) => void, - ): this + getUnprocessedDocuments(): Promise[]> { var self = this; - this._collection.find( - { published: false }, - {}, - function( _err, cursor ) + return new Promise( ( resolve, reject ) => + { + if ( !self._collection ) { - cursor.toArray( function( _err: NullableError, data: any[] ) - { - // was the quote found? - if ( data.length == 0 ) - { - callback.call( self, [] ); - - return; - } - - // return the quote data - callback.call( self, data ); - }); + reject( 'Database not ready' ); + return; } - ) - return this; + + this._collection!.find( + { published: false }, + {}, + function( _err, cursor ) + { + cursor.toArray( function( _err: NullableError, data: any[] ) + { + // was the quote found? + if ( data.length == 0 ) + { + resolve( [] ); + return; + } + + // return the quote data + resolve( data ); + }); + } + ) + } ); } @@ -91,42 +167,35 @@ export class MongoDeltaDao implements DeltaDao * * @param doc_id - Document whose index will be set * @param type - Delta type - * @param index - Index to set - * @param callback - Callback function */ - advanceDeltaIndexByType( + advanceDeltaIndex( doc_id: DocumentId, type: MongoDeltaType, - index: PositiveInteger, - callback: ( err: NullableError, indexAdvanced: boolean ) => void, - ): this + ): Promise { - var self = this; + return new Promise( ( resolve, reject ) => + { + const inc_data: Record = {}; - const set_data: Record = {}; + inc_data[ 'lastPublishDelta.' + type ] = 1; - set_data[ 'lastPublishDelta.' + type ] = index; - - this._collection.update( - { id: doc_id }, - { $set: set_data }, - { upsert: true }, - function( err ) - { - if ( err ) + this._collection!.update( + { id: doc_id }, + { $inc: inc_data }, + { upsert: false }, + function( err ) { - callback.call( self, err, false ); + if ( err ) + { + reject( 'Error advancing delta index: ' + err ) + return; + } + resolve(); return; } - - callback.call( self, null, true ); - - return; - } - ); - - return this; + ); + } ); } @@ -140,35 +209,30 @@ export class MongoDeltaDao implements DeltaDao * @return true if the document was successfully marked as processed */ markDocumentAsProcessed( - doc_id: DocumentId, - last_update_ts: UnixTimestamp, - callback: ( err: NullableError, indexAdvanced: boolean ) => void, - ): this + doc_id: DocumentId, + last_update_ts: UnixTimestamp, + ): Promise { - var self = this; - - this._collection.update( - { id: doc_id, lastUpdate: { $gt: last_update_ts } }, - { $set: { processed: true } }, - { upsert: false }, - function( err, result ) - { - if ( err ) + return new Promise( ( resolve, reject ) => + { + this._collection!.update( + { id: doc_id, lastUpdate: { $lte: last_update_ts } }, + { $set: { published: true } }, + { upsert: false }, + function( err ) { - callback.call( self, err, false ); + if ( err ) + { + reject( "Error marking document as processed: " + err ); + return; + } + resolve(); return; } + ); - console.log( '-------', result ); - - callback.call( self, null, true ); - - return; - } - ); - - return this; + } ); } } diff --git a/src/system/event/EventDispatcher.ts b/src/system/event/EventDispatcher.ts new file mode 100644 index 0000000..45a15b8 --- /dev/null +++ b/src/system/event/EventDispatcher.ts @@ -0,0 +1,44 @@ +/** + * Event Dispatcher + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Dispatch events + */ + +import { EventEmitter } from "events"; + +export class EventDispatcher extends EventEmitter +{ + /** + * Initialize dispatcher + * + * @param _emitter - the event emitter + */ + constructor( + private readonly _emitter: EventEmitter + ) { + super(); + } + + + dispatch( event_id: string, arg: any ): void + { + this._emitter.emit( event_id, arg ); + } +} diff --git a/src/system/event/EventSubscriber.ts b/src/system/event/EventSubscriber.ts new file mode 100644 index 0000000..2950460 --- /dev/null +++ b/src/system/event/EventSubscriber.ts @@ -0,0 +1,44 @@ +/** + * Event Subscriber + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Subscribe to events + */ + +import { EventEmitter } from "events"; + +export class EventSubscriber extends EventEmitter +{ + /** + * Initialize subscriber + * + * @param _emitter - the event emitter + */ + constructor( + private readonly _emitter: EventEmitter + ) { + super(); + } + + + subscribe( event_id: string, callback: ( arg: any ) => void ): void + { + this._emitter.on( event_id, callback ); + } +} diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts index 6cc221f..61d764e 100644 --- a/src/types/mongodb.d.ts +++ b/src/types/mongodb.d.ts @@ -28,6 +28,32 @@ import { PositiveInteger } from "../numeric"; declare module "mongodb"; +export interface MongoDbConfig extends Record { + /** Host */ + host?: string; + + /** Port number */ + port?: number; + + /** High availability */ + ha: boolean; +} + + +/** + * Interface for the mongo database + */ +export interface MongoDb +{ + /** + * Initialize the database connection + * + * @param callback continuation on completion + */ + open( callback: MongoCallback ): void; +} + + /** * Node-style callback for queries */ @@ -139,8 +165,6 @@ declare interface MongoCollection * @param data update data * @param options query options * @param callback continuation on completion - * - * @return callback return value */ update( selector: MongoSelector, diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index 3e70282..d32197e 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -23,9 +23,11 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao'; +import { EventDispatcher } from '../../src/system/event/EventDispatcher'; import { expect, use as chai_use } from 'chai'; +import { EventEmitter } from 'events'; chai_use( require( 'chai-as-promised' ) ); @@ -166,7 +168,8 @@ describe( 'system.DeltaProcessor', () => { const sut = new Sut( createMockDeltaDao(), - createMockDeltaPublisher() + createMockDeltaPublisher(), + new EventDispatcher( new EventEmitter() ), ); const actual = sut.getTimestampSortedDeltas( given ); @@ -287,7 +290,8 @@ describe( 'system.DeltaProcessor', () => { const sut = new Sut( createMockDeltaDao(), - createMockDeltaPublisher() + createMockDeltaPublisher(), + new EventDispatcher( new EventEmitter() ), ); const actual = sut.getDeltas( given, type ); @@ -301,9 +305,9 @@ describe( 'system.DeltaProcessor', () => function createMockDeltaDao(): DeltaDao { return { - getUnprocessedDocuments() { return this }, - advanceDeltaIndexByType() { return this }, - markDocumentAsProcessed() { return this }, + getUnprocessedDocuments() { return Promise.resolve( [] ); }, + advanceDeltaIndex() { return Promise.resolve( null ); }, + markDocumentAsProcessed() { return Promise.resolve( null ); }, }; } @@ -311,6 +315,6 @@ function createMockDeltaDao(): DeltaDao function createMockDeltaPublisher(): AmqpPublisher { return { - publish() {}, + publish() { return Promise.resolve( null ); }, }; } diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index 9f72cd1..fecef2d 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -19,12 +19,14 @@ * along with this program. If not, see . */ +import { EventDispatcher } from '../../src/system/event/EventDispatcher'; import { DeltaPublisher as Sut, AmqpConfig } from "../../src/system/DeltaPublisher"; import { expect, use as chai_use } from 'chai'; +import { EventEmitter } from "events"; chai_use( require( 'chai-as-promised' ) ); @@ -34,16 +36,327 @@ describe( 'server.DeltaPublisher', () => { it( 'sends a message', () => { - const conf = createMockConf(); + const conf = createMockConf(); + const dispatcher = new EventDispatcher( new EventEmitter() ); - console.log( new Sut( conf, {} ) ); + console.log( new Sut( conf, dispatcher, ts_ctr ) ); expect( true ).to.be.true - }); - }); + } ); + } ); + + describe( '#sendMessage', () => + { + it( 'sends a message', () => + { + const conf = createMockConf(); + const dispatcher = new EventDispatcher( new EventEmitter() ); + + console.log( new Sut( conf, dispatcher, ts_ctr ) ); + expect( true ).to.be.true + } ); + } ); + + describe( '#avroEncode parses', () => + { + [ + { + label: 'Null value', + valid: true, + delta_data: { foo: null }, + }, + { + label: 'Null array', + valid: true, + delta_data: { foo: { "array": [ null ] } }, + }, + { + label: 'Boolean value', + valid: true, + delta_data: { foo: { "array": [ + { "boolean": true }, + ] } }, + }, + { + label: 'Simple string', + valid: true, + delta_data: { foo: { "array": [ + { "string": 'bar' }, + { "string": 'baz' }, + ] } }, + }, + { + label: 'Simple int', + valid: true, + delta_data: { foo: { "array": [ + { "double": 123 }, + ] } }, + }, + { + label: 'Nested array', + valid: true, + delta_data: { foo: { "array": [ + { "array": [ + { "string": 'bar' }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + valid: true, + delta_data: { foo: { "array": [ + { "string": 'bar' }, + { "string": 'baz' }, + null, + ] } }, + }, + { + label: 'Nested Array with mixed values', + valid: true, + delta_data: { foo: { "array": [ + { "array": [ + { "string": 'bar' }, + { "double": 123321 }, + null, + ] } + ] } }, + }, + { + label: 'Non-array', + valid: false, + delta_data: { foo: 'bar' }, + }, + { + label: 'Map objects', + valid: true, + delta_data: { "foo": { "array": [ + { "map": { + "bar": { "map": { + "baz": { "double": 1572903485000 }, + } } + } } + ] } }, + } + ].forEach( ( { label, delta_data, valid } ) => + { + it( label, () => + { + let errorCalled = false; + + const dispatcher = { + dispatch( _event_id, _err ) + { + errorCalled = true; + + console.log( 'server.DeltaPublisher.Error' + _err ); + } + } + + const conf = createMockConf(); + const data = createMockData( delta_data ); + const sut = new Sut( conf, dispatcher, ts_ctr ); + const buffer = sut.avroEncode( data ); + + if ( valid ) + { + expect( typeof(buffer) ).to.equal( 'object' ); + } + else + { + expect( buffer ).to.equal( null ); + } + + expect( valid ).to.equal( !errorCalled ); + } ); + } ); + } ); + + + describe( '#avroFormat formats', () => + { + [ + { + label: 'Null', + delta_data: null, + expected: null, + }, + { + label: 'Null Value', + delta_data: { foo: null }, + expected: { foo: null }, + }, + { + label: 'Boolean Value', + delta_data: { foo: [ true ] }, + expected: { foo: { "array": [ + { "boolean": true }, + ] } }, + }, + { + label: 'Simple string', + delta_data: { foo: [ + 'bar', + 'baz', + ] }, + expected: { foo: { "array": [ + { "string": 'bar' }, + { "string": 'baz' }, + ] } }, + }, + { + label: 'Simple int', + delta_data: { foo: [ + 123 + ] }, + expected: { foo: { "array": [ + { "double": 123 }, + ] } }, + }, + { + label: 'Nested array', + delta_data: { foo: [ + [ + 'bar', + 'baz', + ] + ] }, + expected: { foo: { "array": [ + { "array": [ + { "string": 'bar' }, + { "string": 'baz' }, + ] }, + ] } }, + }, + { + label: 'Double nested array', + delta_data: { foo: [ + [ + [ + 'bar', + 123, + null + ], + ], + ] }, + expected: { foo: { "array": [ + { "array": [ + { "array": [ + { "string": 'bar' }, + { "double": 123 }, + null, + ] }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + delta_data: { foo: [ + 'bar', + 'baz', + null + ] }, + expected: { foo: { "array": [ + { "string": 'bar' }, + { "string": 'baz' }, + null + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + [ + 'bar', + 123321, + null, + ] + ] }, + expected: { foo: { "array": [ + { "array": [ + { "string": 'bar' }, + { "double": 123321 }, + null, + ] }, + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + { + "bar": { + "wer": 'qaz', + "qwe": 1572903485000, + "asd": true, + "zxc": null, + }, + }, + ] }, + expected: { "foo": { "array": [ + { "map": { + "bar": { "map": { + "wer": { "string": 'qaz' }, + "qwe": { "double": 1572903485000 }, + "asd": { "boolean": true }, + "zxc": null, + } }, + } }, + ] } }, + }, + ].forEach( ( { label, delta_data, expected } ) => + { + it( label, () => + { + const dispatcher = {} + const conf = createMockConf(); + const sut = new Sut( conf, dispatcher, ts_ctr ); + const actual = sut.avroFormat( delta_data ); + + expect( actual ).to.deep.equal( expected ); + } ); + } ); + } ); } ); +function ts_ctr(): UnixTimestamp +{ + return Math.floor( new Date().getTime() / 1000 ); +} function createMockConf(): AmqpConfig { return {}; } + + +function createMockData( delta_data: any ): any +{ + + return { + event: { + id: 'RATE', + ts: 1573856916, + actor: 'SERVER', + step: null, + }, + document: { + id: 123123, + created: 1573856916, + modified: 1573856916, + top_visited_step: '2', + }, + session: { + entity_name: 'Foobar', + entity_id: 123123 , + }, + data: null, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', + }, + }, + }; +} \ No newline at end of file