diff --git a/.env b/.env index 2959fba..21f018c 100644 --- a/.env +++ b/.env @@ -1,15 +1,17 @@ -NODE_ENV=dev -AMQP_HOSTNAME=localhost +AMQP_HOST=localhost AMQP_PORT=5672 -AMQP_USERNAME= -AMQP_PASSWORD= +AMQP_USER= +AMQP_PASS= AMQP_FRAMEMAX=0 AMQP_HEARTBEAT=2 AMQP_VHOST= AMQP_EXCHANGE= AMQP_RETRIES=30 AMQP_RETRY_WAIT=1 -PROM_HOSTNAME= +PROM_HOST= PROM_PORT=9091 PROM_PUSH_INTERVAL_MS=5000 +PROM_BUCKETS_START=0 +PROM_BUCKETS_WIDTH=10 +PROM_BUCKETS_COUNT=10 PROCESS_INTERVAL_MS=2000 diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 8a35e51..83e42e3 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -18,13 +18,14 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -import * as amqplib from "amqplib"; +import * as amqplib from 'amqplib'; import { createAmqpConfig } from '../src/system/AmqpPublisher'; import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; import { DeltaProcessor } from '../src/system/DeltaProcessor'; import { DeltaPublisher } from '../src/system/DeltaPublisher'; import { MongoCollection } from '../src/types/mongodb'; import { createAvroEncoder } from '../src/system/avro/AvroFactory'; +import { V1MessageWriter } from '../src/system/avro/V1MessageWriter'; import { createMongoConfig, createMongoDB, @@ -39,8 +40,9 @@ import { createPrometheusConfig, } from '../src/system/PrometheusFactory'; import { AmqpConnection } from '../src/system/amqp/AmqpConnection'; -import { parse as avro_parse } from "avro-js"; +import { parse as avro_parse } from 'avro-js'; +require('dotenv-flow').config(); const amqp_conf = createAmqpConfig( process.env ); const prom_conf = createPrometheusConfig( process.env ); @@ -51,12 +53,17 @@ const env = process.env.NODE_ENV || 'Unknown Environment'; const emitter = new EventEmitter(); const log = new StandardLogger( console, ts_ctr, env ); const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter ); -const publisher = new DeltaPublisher( + +const message_writer = new V1MessageWriter( + createAvroEncoder, + avro_parse( __dirname + '/../src/system/avro/schema.avsc' ), +); + +const publisher = new DeltaPublisher( emitter, ts_ctr, - createAvroEncoder, amqp_connection, - avro_parse( __dirname + '/../src/system/avro/schema.avsc' ), + message_writer, ); // Prometheus Metrics diff --git a/package.json.in b/package.json.in index 898ec54..49ffc7b 100644 --- a/package.json.in +++ b/package.json.in @@ -24,9 +24,10 @@ }, "dependencies": { - "easejs": "0.2.x", - "mongodb": "1.2.14", - "amqplib": "0.5.3" + "easejs": "0.2.x", + "mongodb": "1.2.14", + "dotenv-flow": "3.1.0", + "amqplib": "0.5.3" }, "devDependencies": { "typescript": "~3.7", diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index b83a8e7..7c5dd6a 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -81,6 +81,15 @@ export interface DeltaDocument /** The document id */ id: DocumentId, + /** The entity name */ + agentName: string, + + /** The entity id */ + agentEntityId: number, + + /** The time the document was created */ + startDate: UnixTimestamp, + /** The time the document was updated */ lastUpdate: UnixTimestamp, diff --git a/src/document/Document.ts b/src/document/Document.ts index 0db893a..8f05bac 100644 --- a/src/document/Document.ts +++ b/src/document/Document.ts @@ -18,7 +18,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . * - * The term "Quote" is synonymous with "Document"; this project is moving + * The term 'Quote' is synonymous with 'Document'; this project is moving * more toward the latter as it is further generalized. */ @@ -31,7 +31,29 @@ export type DocumentId = NominalType; /** * Quote (Document) id * - * Where the term "Quote" is still used, this will allow for type + * Where the term 'Quote' is still used, this will allow for type * compatibility and an easy transition. */ export type QuoteId = DocumentId; + + +/** + * Document meta data + */ +export type DocumentMeta = +{ + /** The document id */ + id: DocumentId, + + /** The entity name */ + entity_name: string, + + /** The entity id */ + entity_id: number, + + /** The time the document was created */ + startDate: UnixTimestamp, + + /** The time the document was updated */ + lastUpdate: UnixTimestamp, +} \ No newline at end of file diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts index dd1df2d..3f8128c 100644 --- a/src/server/db/MongoServerDao.ts +++ b/src/server/db/MongoServerDao.ts @@ -309,7 +309,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao save_data.id = id; save_data.pver = quote.getProgramVersion(); save_data.importDirty = 1; - save_data.published = 0; + save_data.published = false; save_data.lastPremDate = quote.getLastPremiumDate(); save_data.initialRatedDate = quote.getRatedDate(); save_data.explicitLock = quote.getExplicitLockReason(); diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index edaa8c1..f41ee63 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -21,8 +21,8 @@ * Publish Amqp message to a queue */ -import { DeltaResult } from "../bucket/delta"; -import { DocumentId } from '../document/Document'; +import { DeltaResult } from '../bucket/delta'; +import { DocumentMeta } from '../document/Document'; import { Options } from 'amqplib'; @@ -37,10 +37,10 @@ export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig { return { protocol: 'amqp', - hostname: env.AMQP_HOSTNAME, + hostname: env.AMQP_HOST, port: +( env.AMQP_PORT || 0 ), - username: env.AMQP_USERNAME, - password: env.AMQP_PASSWORD, + username: env.AMQP_USER, + password: env.AMQP_PASS, locale: 'en_US', frameMax: +( env.AMQP_FRAMEMAX || 0 ), heartbeat: +( env.AMQP_HEARTBEAT || 0 ), @@ -52,8 +52,9 @@ export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig } -export interface AmqpConfig extends Options.Connect { - /** The protocol to connect with (should always be "amqp") */ +export interface AmqpConfig extends Options.Connect +{ + /** The protocol to connect with (should always be 'amqp') */ protocol: string; /** The hostname to connect to */ @@ -68,7 +69,7 @@ export interface AmqpConfig extends Options.Connect { /** A password if one if required */ password?: string; - /** Locale (should always be "en_US") */ + /** Locale (should always be 'en_US') */ locale: string; /** The size in bytes of the maximum frame allowed */ @@ -96,13 +97,13 @@ export interface AmqpPublisher /** * Publish quote message to exchange post-rating * - * @param doc_id - The doc_id - * @param delta - The delta - * @param bucket - The bucket - * @param ratedata - The rate data bucket + * @param meta - document meta data + * @param delta - delta + * @param bucket - bucket + * @param ratedata - rate data bucket */ publish( - doc_id: DocumentId, + meta: DocumentMeta, delta: DeltaResult, bucket: Record, ratedata?: Record, diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 0e5201a..4eba003 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -19,17 +19,17 @@ * along with this program. If not, see . */ -import { DeltaDao } from "../system/db/DeltaDao"; -import { DocumentId } from "../document/Document"; -import { AmqpPublisher } from "./AmqpPublisher"; -import { EventEmitter } from "events"; +import { DeltaDao } from '../system/db/DeltaDao'; +import { DocumentMeta } from '../document/Document'; +import { AmqpPublisher } from './AmqpPublisher'; +import { EventEmitter } from 'events'; import { DeltaType, applyDelta, DeltaDocument, Delta, ReverseDelta, -} from "../bucket/delta"; +} from '../bucket/delta'; /** Deltas and state of data prior to their application */ type DeltaState = [ @@ -77,6 +77,11 @@ export class DeltaProcessor } + /** + * Process the next document + * + * @param docs - list of documents to process + */ private _processNext( docs: DeltaDocument[] ): Promise { const doc = docs.shift(); @@ -91,28 +96,38 @@ export class DeltaProcessor } + /** + * Process an individual document + * + * @param doc - individual document to process + */ private _processDocument( doc: DeltaDocument ): Promise { const deltas = this._getTimestampSortedDeltas( doc ); - const doc_id = doc.id; const bucket = doc.data; const ratedata = doc.ratedata || {}; - const last_updated_ts = doc.lastUpdate; + const meta = { + id: doc.id, + entity_name: doc.agentName, + entity_id: +doc.agentEntityId, + startDate: doc.startDate, + lastUpdate: doc.lastUpdate, + }; const history = this._applyDeltas( deltas, bucket, ratedata ); - return this._processNextDelta( doc_id, history ) + return this._processNextDelta( meta, history ) .then( _ => - this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) + this._dao.markDocumentAsProcessed( meta.id, meta.lastUpdate ) ) .then( _ => { - this._emitter.emit( 'document-processed', { doc_id: doc_id } ); + this._emitter.emit( 'document-processed', { doc_id: meta.id } ); } ) - .catch( e => + .catch( ( e: Error ) => { this._emitter.emit( 'error', e ); - return this._dao.setErrorFlag( doc_id ); + return this._dao.setErrorFlag( meta.id ); } ); } @@ -179,8 +194,14 @@ export class DeltaProcessor } + /** + * Process the next delta from the history + * + * @param meta - document meta data + * @param history - a history of deltas and their buckets (data, ratedata) + */ private _processNextDelta( - doc_id: DocumentId, + meta: DocumentMeta, history: DeltaState[], ): Promise { @@ -191,17 +212,14 @@ export class DeltaProcessor const [ delta, bucket, ratedata ] = history[ 0 ]; - const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type; + const delta_uid = meta.id + '_' + delta.timestamp + '_' + delta.type; this._emitter.emit( 'delta-process-start', delta_uid ); - return this._publisher.publish( doc_id, delta, bucket, ratedata ) - .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) ) + return this._publisher.publish( meta, delta, bucket, ratedata ) + .then( _ => this._dao.advanceDeltaIndex( meta.id, delta.type ) ) .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) - .then( _ => this._processNextDelta( - doc_id, - history.slice( 1 ), - ) ); + .then( _ => this._processNextDelta( meta, history.slice( 1 ) ) ); } diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index 3963934..57a5747 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -23,82 +23,43 @@ import { AmqpPublisher } from './AmqpPublisher'; import { Delta } from '../bucket/delta'; -import { EventEmitter } from "events"; -import { DocumentId } from '../document/Document'; +import { EventEmitter } from 'events'; +import { DocumentMeta } from '../document/Document'; import { context } from '../error/ContextError'; import { AmqpError } from '../error/AmqpError'; -import { AvroEncoderCtr } from './avro/AvroFactory'; +import { MessageWriter } from './MessageWriter'; + import { AmqpConnection } from './amqp/AmqpConnection'; -import { AvroSchema } from "avro-js"; export class DeltaPublisher implements AmqpPublisher { - /** A mapping of which delta type translated to which avro event */ - readonly DELTA_MAP: Record = { - data: 'STEP_SAVE', - ratedata: 'RATE', - }; - - /** * Delta publisher * - * @param _emitter - event emitter instance - * @param _ts_ctr - a timestamp constructor - * @param _encoder_ctr - a factory function to create an avro encoder - * @param _conn - the amqp connection + * @param _emitter - event emitter instance + * @param _ts_ctr - a timestamp constructor + * @param _conn - the amqp connection + * @param _writer - message writer */ constructor( - private readonly _emitter: EventEmitter, - private readonly _ts_ctr: () => UnixTimestamp, - private readonly _encoder_ctor: AvroEncoderCtr, - private readonly _conn: AmqpConnection, - private readonly _schema: AvroSchema, + private readonly _emitter: EventEmitter, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _conn: AmqpConnection, + private readonly _writer: MessageWriter, ) {} /** * Publish quote message to exchange post-rating * - * @param doc_id - The doc_id - * @param delta - The delta - * @param bucket - The bucket - * @param ratedata - The ratedata bucket + * @param meta - document meta data + * @param delta - delta + * @param bucket - bucket + * @param ratedata - rate data bucket */ - publish( - doc_id: DocumentId, - delta: Delta, - bucket: Record, - ratedata: Record = {}, - ): Promise - { - return this._sendMessage( doc_id, delta, bucket, ratedata ) - .then( _ => - { - this._emitter.emit( - 'delta-publish', - { - delta: delta, - exchange: this._conn.getExchangeName(), - } - ); - } ); - } - - - /** - * Send message to exchange - * - * @param doc_id - The doc_id - * @param delta - The delta to publish - * @param bucket - The bucket - * @param ratedata - The ratedata bucket - * - * @return whether publish was successful - */ - private _sendMessage( - doc_id: DocumentId, + publish( + meta: DocumentMeta, delta: Delta, bucket: Record, ratedata: Record, @@ -106,16 +67,14 @@ export class DeltaPublisher implements AmqpPublisher { const ts = this._ts_ctr(); const headers = { version: 1, created: ts }; - const avro_object = this._avroFormat( + + return this._writer.write( ts, - doc_id, + meta, delta, bucket, - ratedata, - ); - - return this.avroEncode( avro_object ) - .then( ( avro_buffer ) => + ratedata + ).then( ( avro_buffer: Buffer ) => { const channel = this._conn.getAmqpChannel(); @@ -124,7 +83,7 @@ export class DeltaPublisher implements AmqpPublisher throw context( new AmqpError( 'Error sending message: No channel' ), { - doc_id: doc_id, + doc_id: meta.id, delta_type: delta.type, delta_ts: delta.timestamp, }, @@ -144,193 +103,22 @@ export class DeltaPublisher implements AmqpPublisher throw context( new Error ( 'Delta publish failed' ), { - doc_id: doc_id, + doc_id: meta.id, delta_type: delta.type, delta_ts: delta.timestamp, } ); } + } ) + .then( ( _: any ) => + { + this._emitter.emit( + 'delta-publish', + { + delta: delta, + exchange: this._conn.getExchangeName(), + } + ); } ); } - - - /** - * Throw an error with specific information if the schema is invalid - * - * @param schema - Avro schema - * @param data - Data to encode - */ - private _assertValidAvro( - schema: AvroSchema, - data: Record, - ): void - { - schema.isValid( data, { errorHook: hook } ); - - function hook( keys: any, vals: any) { - throw context( new Error( 'Invalid Avro Schema' ), - { - invalid_paths: keys, - invalid_data: vals, - } - ); - } - } - - - /** - * Format the avro data with data type labels - * - * @param ts - a timestamp - * @param doc_id - the document id - * @param delta - the current delta - * @param bucket - the data bucket - * @param ratedata - the ratedata bucket - * - * @return the formatted data - */ - private _avroFormat( - ts: UnixTimestamp, - doc_id: DocumentId, - delta: Delta, - bucket: Record, - ratedata: Record, - ): any - { - const delta_formatted = this.setDataTypes( delta.data ); - const bucket_formatted = this.setDataTypes( bucket ); - const ratedata_formatted = this.setDataTypes( ratedata ); - const event_id = this.DELTA_MAP[ delta.type ]; - - return { - event: { - id: event_id, - ts: ts, - actor: 'SERVER', - step: null, - }, - document: { - id: doc_id - }, - data: { - Data: { - bucket: bucket_formatted, - }, - }, - ratedata: { - Data: { - bucket: ratedata_formatted, - }, - }, - delta: { - Data: { - bucket: delta_formatted, - }, - }, - program: { - Program: { - id: 'quote_server', - version: '', - }, - }, - } - } - - - /** - * Encode the data in an avro buffer - * - * @param data - the data to encode - * - * @return the avro buffer or null if there is an error - */ - avroEncode( data: Record ): Promise - { - return new Promise( ( resolve, reject ) => - { - const bufs: Buffer[] = []; - - try - { - this._assertValidAvro( this._schema, data ) - - const encoder = this._encoder_ctor( this._schema ) - - encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) - encoder.on('error', ( err: Error ) => { reject( err ); } ) - encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } ) - encoder.end( data ); - } - catch ( e ) - { - reject( e ); - } - } ); - } - - - /** - * Format the data for avro by add type specifications to the data - * - * @param data - the data to format - * - * @return the formatted data - */ - setDataTypes( data: any, top_level: boolean = true ): any - { - let data_formatted: any = {}; - - switch( typeof( data ) ) - { - case 'object': - if ( data == null ) - { - return null; - } - else if ( Array.isArray( data ) ) - { - let arr: any[] = []; - - data.forEach( ( datum ) => - { - arr.push( this.setDataTypes( datum, false ) ); - } ); - - data_formatted = ( top_level ) - ? arr - : { 'array': arr }; - } - else - { - let datum_formatted: any = {}; - - Object.keys( data).forEach( ( key: string ) => - { - const datum = this.setDataTypes( data[ key ], false ); - - datum_formatted[ key ] = datum; - - } ); - - data_formatted = ( top_level ) - ? datum_formatted - : { "map": datum_formatted }; - } - break; - - case 'boolean': - return { 'boolean': data }; - - case 'number': - return { 'double': data }; - - case 'string': - return { 'string': data }; - - case 'undefined': - return null; - } - - return data_formatted; - } } diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts index cd5b703..b95536c 100644 --- a/src/system/EventMediator.ts +++ b/src/system/EventMediator.ts @@ -48,7 +48,7 @@ export class EventMediator msg ) ); - this._emitter.on( 'amqp-conn-error', ( msg ) => + this._emitter.on( 'amqp-conn-warn', ( msg ) => this._log.warning( 'AMQP Connection Error', msg ) ); this._emitter.on( 'amqp-reconnect', () => @@ -68,6 +68,11 @@ export class EventMediator } + /** + * Handle an error event + * + * @param e - any + */ private _handleError( e: any ): void { let msg: string = ''; diff --git a/src/system/MessageWriter.ts b/src/system/MessageWriter.ts new file mode 100644 index 0000000..8ee2b84 --- /dev/null +++ b/src/system/MessageWriter.ts @@ -0,0 +1,44 @@ +/** + * Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Write a message to be published to a queue + */ +import { DocumentMeta } from '../document/Document'; +import { DeltaResult } from '../bucket/delta'; + +export interface MessageWriter +{ + /** + * Write the data to a message + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + */ + write( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: DeltaResult, + bucket: Record, + ratedata: Record, + ): Promise +} \ No newline at end of file diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index bc4a6cc..9432a9d 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -22,7 +22,7 @@ */ import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; -import { EventEmitter } from "events"; +import { EventEmitter } from 'events'; import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory'; const client = require( 'prom-client' ) @@ -72,6 +72,7 @@ export class MetricsCollector * @param _factory - A factory to create prometheus components * @param _conf - Prometheus configuration * @param _emitter - Event emitter + * @param _timer - A timer function to create a tuple timestamp */ constructor( private readonly _factory: PrometheusFactory, @@ -96,9 +97,9 @@ export class MetricsCollector client, this._process_time_name, this._process_time_help, - 0, - 10, - 10, + this._conf.buckets_start, + this._conf.buckets_width, + this._conf.buckets_count, ); this._total_error = this._factory.createCounter( @@ -123,7 +124,8 @@ export class MetricsCollector this._push_interval = setInterval( () => { this._gateway.pushAdd( - { jobName: 'liza_delta_metrics' }, this.pushCallback + { jobName: 'liza_delta_metrics' }, + this.getPushCallback( this ) ); }, this._conf.push_interval_ms ); @@ -145,7 +147,7 @@ export class MetricsCollector /** * List to events to update metrics */ - private hookMetrics() + private hookMetrics(): void { this._emitter.on( 'delta-process-start', @@ -165,35 +167,36 @@ export class MetricsCollector } ); - this._emitter.on( - 'delta-process-error', - ( _ ) => this._total_error.inc() - ); + this._emitter.on( 'error', ( _ ) => this._total_error.inc() ); } /** * Handle push error * - * @param error - Any errors that occurred - * @param response - The http response - * @param body - The resposne body + * @param self - Metrics Collector object + * + * @return a function to handle the pushAdd callback */ - private pushCallback( - error?: Error | undefined, - _response?: any, - _body?: any - ): void + private getPushCallback( self: MetricsCollector ): () => void { - if ( error ) + return ( + error?: Error | undefined, + _response?: any, + _body?: any + ): void => { - this._emitter.emit( 'error', error ); + if ( error ) + { + self._emitter.emit( 'error', error ); + } } } - /** * Update metrics with current error count + * + * @param count - the number of errors found */ updateErrorCount( count: number ): void { diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts index 0cf059f..7330ea9 100644 --- a/src/system/PrometheusFactory.ts +++ b/src/system/PrometheusFactory.ts @@ -35,6 +35,15 @@ export declare type PrometheusConfig = { /** The rate (in milliseconds) at which metrics are pushed */ push_interval_ms: number; + + /** The starting point for process time buckets */ + buckets_start: number; + + /** The width of process time buckets */ + buckets_width: number; + + /** The number of process time buckets */ + buckets_count: number; } @@ -50,10 +59,13 @@ export function createPrometheusConfig( ): PrometheusConfig { return { - 'hostname': env.prom_hostname, - 'port': +( env.prom_port || 0 ), - 'env': process.env.NODE_ENV, - 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ), + hostname: env.PROM_HOST, + port: +( env.PROM_PORT || 0 ), + env: process.env.NODE_ENV, + push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 5000 ), + buckets_start: +( process.env.PROM_BUCKETS_START || 0 ), + buckets_width: +( process.env.PROM_BUCKETS_WIDTH || 10 ), + buckets_count: +( process.env.PROM_BUCKETS_COUNT || 10 ), }; } @@ -63,8 +75,9 @@ export class PrometheusFactory /** * Create a PushGateway * - * @param client - prometheus client - * @param url - the url of the push gateway + * @param client - prometheus client + * @param hostname - push gateway url + * @param port - push gateway port * * @return the gateway */ diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts index d69c3d4..cdd062d 100644 --- a/src/system/StandardLogger.ts +++ b/src/system/StandardLogger.ts @@ -160,6 +160,7 @@ export class StandardLogger implements PsrLogger * * @param msg - the string or object to log * @param level - the log level + * @param context - additional message context * * @returns a structured logging object */ diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts index 6d50bfc..410f808 100644 --- a/src/system/amqp/AmqpConnection.ts +++ b/src/system/amqp/AmqpConnection.ts @@ -24,8 +24,6 @@ import * as amqplib from "amqplib"; /** * Connection to AMQP exchange - * - * XXX: Needs tests! */ export class AmqpConnection { @@ -39,6 +37,7 @@ export class AmqpConnection /** * Amqp Connection * + * @param _conf - amqp library * @param _conf - amqp configuration * @param _emitter - event emitter instance */ @@ -65,7 +64,7 @@ export class AmqpConnection */ this._conn.once( 'error', e => { - this._emitter.emit( 'amqp-conn-error', e ); + this._emitter.emit( 'amqp-conn-warn', e ); this._reconnect(); } ); diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts new file mode 100644 index 0000000..09ee64a --- /dev/null +++ b/src/system/avro/V1MessageWriter.ts @@ -0,0 +1,259 @@ +/** + * Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Write a message to be published to a queue + */ +import { DocumentMeta } from '../../document/Document'; +import { Delta } from '../../bucket/delta'; +import { AvroEncoderCtr } from '../avro/AvroFactory'; +import { AvroSchema } from 'avro-js'; +import { MessageWriter } from '../MessageWriter'; +import { context } from '../../error/ContextError'; + + +export class V1MessageWriter implements MessageWriter +{ + /** A mapping of which delta type translated to which avro event */ + readonly DELTA_MAP: Record = { + data: 'STEP_SAVE', + ratedata: 'RATE', + }; + + + /** + * Delta publisher + * + * @param _encoder_ctr - a factory function to create an avro encoder + * @param _conn - the amqp connection + */ + constructor( + private readonly _encoder_ctor: AvroEncoderCtr, + private readonly _schema: AvroSchema, + ) {} + + + /** + * Write the data to a message + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + */ + write( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: Delta, + bucket: Record, + ratedata: Record, + ): Promise + { + const avro_object = this._avroFormat( + ts, + meta, + delta, + bucket, + ratedata, + ); + + return this.avroEncode( avro_object ); + } + + + /** + * Format the avro data with data type labels + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + * + * @return the formatted data + */ + private _avroFormat( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: Delta, + bucket: Record, + ratedata: Record, + ): any + { + const delta_formatted = this.setDataTypes( delta.data ); + const bucket_formatted = this.setDataTypes( bucket ); + const ratedata_formatted = this.setDataTypes( ratedata ); + const event_id = this.DELTA_MAP[ delta.type ]; + + return { + event: { + id: event_id, + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: meta.id, + created: meta.startDate, + modified: meta.lastUpdate, + }, + session: { + Session: { + entity_id: meta.entity_id, + entity_name: meta.entity_name, + }, + }, + data: { + Data: { + bucket: bucket_formatted, + }, + }, + ratedata: { + Data: { + bucket: ratedata_formatted, + }, + }, + delta: { + Data: { + bucket: delta_formatted, + }, + }, + program: { + Program: { + id: 'quote_server', + version: '', + }, + }, + } + } + + + /** + * Encode the data in an avro buffer + * + * @param data - the data to encode + * + * @return the avro buffer or null if there is an error + */ + avroEncode( data: Record ): Promise + { + return new Promise( ( resolve, reject ) => + { + const bufs: Buffer[] = []; + + try + { + this._schema.isValid( + data, + { + errorHook: ( keys: any, vals: any) => + { + throw context( + new Error( 'Invalid Avro Schema' ), + { + invalid_paths: keys, + invalid_data: vals, + } + ); + } + } + ); + + const encoder = this._encoder_ctor( this._schema ) + + encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) + encoder.on('error', ( err: Error ) => { reject( err ); } ) + encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } ) + encoder.end( data ); + } + catch ( e ) + { + reject( e ); + } + } ); + } + + + /** + * Format the data for avro by add type specifications to the data + * + * @param data - the data to format + * @param top_level - whether we are at the top level of the recursion + * + * @return the formatted data + */ + setDataTypes( data: any, top_level: boolean = true ): any + { + let data_formatted: any = {}; + + switch( typeof( data ) ) + { + case 'object': + if ( data == null ) + { + return null; + } + else if ( Array.isArray( data ) ) + { + let arr: any[] = []; + + data.forEach( ( datum ) => + { + arr.push( this.setDataTypes( datum, false ) ); + } ); + + data_formatted = ( top_level ) + ? arr + : { 'array': arr }; + } + else + { + let datum_formatted: any = {}; + + Object.keys( data).forEach( ( key: string ) => + { + const datum = this.setDataTypes( data[ key ], false ); + + datum_formatted[ key ] = datum; + + } ); + + data_formatted = ( top_level ) + ? datum_formatted + : { 'map': datum_formatted }; + } + break; + + case 'boolean': + return { 'boolean': data }; + + case 'number': + return { 'double': data }; + + case 'string': + return { 'string': data }; + + case 'undefined': + return null; + } + + return data_formatted; + } +} \ No newline at end of file diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc index 53e4a9d..f432e9a 100644 --- a/src/system/avro/schema.avsc +++ b/src/system/avro/schema.avsc @@ -34,7 +34,7 @@ }, { "name": "step", - "type":[ + "type": [ "null", { "type": "record", @@ -77,6 +77,26 @@ ] } }, + { + "name": "session", + "type": [ + "null", + { + "type": "record", + "name": "Session", + "fields": [ + { + "name": "entity_name", + "type": "string" + }, + { + "name": "entity_id", + "type": "int" + } + ] + } + ] + }, { "name": "data", "type": [ diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index 8d45684..e058980 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -40,6 +40,9 @@ export class MongoDeltaDao implements DeltaDao /** The document fields to read */ readonly RESULT_FIELDS: Record = { id: 1, + agentName: 1, + agentEntityId: 1, + startDate: 1, lastUpdate: 1, data: 1, ratedata: 1, @@ -70,7 +73,7 @@ export class MongoDeltaDao implements DeltaDao this._collection.find( { published: false, - deltaError: false, + deltaError: { $ne: true }, }, { fields: this.RESULT_FIELDS }, ( e, cursor ) => diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index 2cd68e2..1587bb2 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -22,7 +22,7 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; -import { DeltaDocument } from "../../src/bucket/delta"; +import { DeltaDocument } from '../../src/bucket/delta'; import { DocumentId } from '../../src/document/Document'; import { EventEmitter } from 'events'; @@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () => expected: any }[]>[ { - label: "No deltas are processed", + label: 'No deltas are processed', given: [ { id: 123, @@ -53,9 +53,9 @@ describe( 'system.DeltaProcessor', () => expected: [], }, - // when quote is initialized: { foo: [ "" ], state: [ "a" ] } + // when quote is initialized: { foo: [ '' ], state: [ 'a' ] } { - label: "Publishes deltas in order", + label: 'Publishes deltas in order', given: [ { @@ -63,13 +63,13 @@ describe( 'system.DeltaProcessor', () => lastUpdate: 123123123, data: { - foo: [ "third" ], - state: [ "a", "b", "c", "d" ], + foo: [ 'third' ], + state: [ 'a', 'b', 'c', 'd' ], }, ratedata: { - prem: [ "rate_second" ], - state: [ "i", "ii", "iii" ], + prem: [ 'rate_second' ], + state: [ 'i', 'ii', 'iii' ], }, rdelta: { @@ -77,21 +77,21 @@ describe( 'system.DeltaProcessor', () => { timestamp: 1, data: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, }, { timestamp: 3, data: { - foo: [ "first" ], + foo: [ 'first' ], state: [ undefined, undefined, null ], }, }, { timestamp: 5, data: { - foo: [ "second" ], + foo: [ 'second' ], state: [ undefined, undefined, undefined, null ], }, }, @@ -101,14 +101,14 @@ describe( 'system.DeltaProcessor', () => { timestamp: 2, data: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, }, { timestamp: 4, data: { - prem: [ "rate_first" ], + prem: [ 'rate_first' ], state: [ undefined, undefined, null ], }, }, @@ -122,12 +122,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: {}, }, @@ -136,16 +136,16 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem: [ "rate_first" ], - state: [ "i", "ii" ], + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, }, @@ -153,12 +153,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "first" ], + foo: [ 'first' ], state: [ undefined, undefined, null ], }, bucket: { - foo: [ "second" ], - state: [ "a", "b", "c" ], + foo: [ 'second' ], + state: [ 'a', 'b', 'c' ], }, ratedata: {}, }, @@ -167,16 +167,16 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - prem: [ "rate_first" ], + prem: [ 'rate_first' ], state: [ undefined, undefined, null ], }, bucket: { - foo: [ "second" ], - state: [ "a", "b", "c" ], + foo: [ 'second' ], + state: [ 'a', 'b', 'c' ], }, ratedata: { - prem: [ "rate_second" ], - state: [ "i", "ii", "iii" ], + prem: [ 'rate_second' ], + state: [ 'i', 'ii', 'iii' ], }, }, @@ -184,12 +184,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "second" ], + foo: [ 'second' ], state: [ undefined, undefined, undefined, null ], }, bucket: { - foo: [ "third" ], - state: [ "a", "b", "c", "d" ], + foo: [ 'third' ], + state: [ 'a', 'b', 'c', 'd' ], }, ratedata: {}, }, @@ -197,7 +197,7 @@ describe( 'system.DeltaProcessor', () => }, { - label: "Publishes deltas in order for multiple documents", + label: 'Publishes deltas in order for multiple documents', given: [ { @@ -205,13 +205,13 @@ describe( 'system.DeltaProcessor', () => lastUpdate: 123123123, data: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem: [ "rate_first" ], - state: [ "i", "ii" ], + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, rdelta: { @@ -219,7 +219,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 1, data: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, }, @@ -229,7 +229,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 4, data: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, }, @@ -245,13 +245,13 @@ describe( 'system.DeltaProcessor', () => lastUpdate: 121212123, data: { - foo2: [ "first" ], - state: [ "a", "b" ], + foo2: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem2: [ "rate_first" ], - state: [ "i", "ii" ], + prem2: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, rdelta: { @@ -259,7 +259,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 2, data: { - foo2: [ "" ], + foo2: [ '' ], state: [ undefined, null ], }, }, @@ -269,7 +269,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 3, data: { - prem2: [ "" ], + prem2: [ '' ], state: [ undefined, null ], }, }, @@ -283,12 +283,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: {}, }, @@ -297,16 +297,16 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem: [ "rate_first" ], - state: [ "i", "ii" ], + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, }, @@ -314,12 +314,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 234, rdelta: { - foo2: [ "" ], + foo2: [ '' ], state: [ undefined, null ], }, bucket: { - foo2: [ "first" ], - state: [ "a", "b" ], + foo2: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: {}, }, @@ -328,37 +328,37 @@ describe( 'system.DeltaProcessor', () => { doc_id: 234, rdelta: { - prem2: [ "" ], + prem2: [ '' ], state: [ undefined, null ], }, bucket: { - foo2: [ "first" ], - state: [ "a", "b" ], + foo2: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem2: [ "rate_first" ], - state: [ "i", "ii" ], + prem2: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, }, ], }, { - label: "trims delta array based on index", + label: 'trims delta array based on index', given: [ { id: 111, lastUpdate: 123123123, - data: { foo: [ "second" ] }, + data: { foo: [ 'second' ] }, ratedata: {}, rdelta: { data: [ { - data: { foo: [ "" ] }, + data: { foo: [ '' ] }, timestamp: 123, }, { - data: { foo: [ "first" ] }, + data: { foo: [ 'first' ] }, timestamp: 234, }, ], @@ -371,8 +371,8 @@ describe( 'system.DeltaProcessor', () => expected: [ { doc_id: 111, - rdelta: { foo: [ "first" ] }, - bucket: { foo: [ "second" ] }, + rdelta: { foo: [ 'first' ] }, + bucket: { foo: [ 'second' ] }, ratedata: {} }, ], @@ -390,14 +390,14 @@ describe( 'system.DeltaProcessor', () => } publisher.publish = ( - doc_id, + meta, delta, bucket, ratedata, ): Promise => { published.push( { - doc_id: doc_id, + doc_id: meta.id, rdelta: delta.data, bucket: bucket, ratedata: ratedata, @@ -422,12 +422,19 @@ describe( 'system.DeltaProcessor', () => const dao = createMockDeltaDao(); const publisher = createMockDeltaPublisher(); const emitter = new EventEmitter(); + const entity_num = 'Some Agency'; + const entity_id = 4321; + const lastUpdate = 123123123; + const createdData = 234234234; const doc = [ { - id: 123, - lastUpdate: 123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: 123, + agentName: entity_num, + agentEntityId: entity_id, + startDate: createdData, + lastUpdate: lastUpdate, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -439,11 +446,14 @@ describe( 'system.DeltaProcessor', () => }, }, { - id: 234, - lastUpdate: 123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: 234, + agentName: entity_num, + agentEntityId: entity_id, + startDate: createdData, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -457,13 +467,25 @@ describe( 'system.DeltaProcessor', () => const expected_published = [ { - doc_id: 123, + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 123, + lastUpdate: 123123123, + startDate: 234234234, + }, delta: { foo: [ 'first_bar' ] }, bucket: { foo: [ 'start_bar' ] }, ratedata: {}, }, { - doc_id: 234, + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 234, + lastUpdate: 123123123, + startDate: 234234234, + }, delta: { foo: [ 'first_bar' ] }, bucket: { foo: [ 'start_bar' ] }, ratedata: {}, @@ -485,14 +507,14 @@ describe( 'system.DeltaProcessor', () => } publisher.publish = ( - doc_id, + meta, delta, bucket, ratedata, ): Promise => { published.push( { - doc_id: doc_id, + meta: meta, delta: delta.data, bucket: bucket, ratedata: ratedata, @@ -525,11 +547,14 @@ describe( 'system.DeltaProcessor', () => const publisher = createMockDeltaPublisher(); const emitter = new EventEmitter(); const doc = [ { - id: 123, - lastUpdate: 123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: 123, + agentName: 'Some Agency', + agentEntityId: 4321, + startDate: 234234234, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -541,11 +566,14 @@ describe( 'system.DeltaProcessor', () => }, }, { - id: 234, - lastUpdate: 123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: 234, + agentName: 'Some Agency', + agentEntityId: 4321, + startDate: 234234234, + lastUpdate: 123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -559,7 +587,13 @@ describe( 'system.DeltaProcessor', () => // Only one is published const expected_published = [ { - doc_id: 123, + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 123, + lastUpdate: 123123123, + startDate: 234234234, + }, delta: { foo: [ 'first_bar' ] }, bucket: { foo: [ 'start_bar' ] }, ratedata: {}, @@ -577,14 +611,14 @@ describe( 'system.DeltaProcessor', () => Promise.reject( new Error( expected_error ) ); publisher.publish = ( - doc_id, + meta, delta, bucket, ratedata, ): Promise => { published.push( { - doc_id: doc_id, + meta, delta: delta.data, bucket: bucket, ratedata: ratedata, diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index 314c7da..f352b0c 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -22,20 +22,17 @@ import { AmqpConnection } from '../../src/system/amqp/AmqpConnection'; import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; -import { DocumentId } from '../../src/document/Document'; -import { Duplex } from 'stream'; -import { EventEmitter } from "events"; +import { DocumentId, DocumentMeta } from '../../src/document/Document'; +import { EventEmitter } from 'events'; import { hasContext } from '../../src/error/ContextError'; import { AmqpError } from '../../src/error/AmqpError'; import { Channel } from 'amqplib'; -import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory'; +import { MessageWriter } from '../../src/system/MessageWriter'; -import { AvroSchema } from "avro-js"; import { expect, use as chai_use } from 'chai'; chai_use( require( 'chai-as-promised' ) ); -const sinon = require( 'sinon' ); describe( 'server.DeltaPublisher', () => { @@ -49,6 +46,15 @@ describe( 'server.DeltaPublisher', () => const ratedata = createMockBucketData(); const emitter = new EventEmitter(); const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const meta = { + id: 123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: 345, + lastUpdate: 456, + }; + conn.getAmqpChannel = () => { return { @@ -63,23 +69,10 @@ describe( 'server.DeltaPublisher', () => }; }; - const stub_schema = ({ - isValid() - { - // TODO: test me - }, - } ); - - const sut = new Sut( - emitter, - ts_ctr, - createMockEncoderCtor( stub_schema ), - conn, - stub_schema, - ); + const sut = new Sut( emitter, ts_ctr, conn, writer ); return expect( - sut.publish( 123, delta, bucket, ratedata ) + sut.publish( meta, delta, bucket, ratedata ) ).to.eventually.deep.equal( undefined ) .then( _ => { @@ -119,29 +112,25 @@ describe( 'server.DeltaPublisher', () => const ratedata = createMockBucketData(); const emitter = new EventEmitter(); const conn = createMockAmqpConnection(); - const doc_id = 123; + const writer = createMockWriter(); + const meta = { + id: 123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: 345, + lastUpdate: 456, + }; + const expected = { - doc_id: doc_id, + doc_id: meta.id, delta_type: delta.type, delta_ts: delta.timestamp } conn.getAmqpChannel = getChannelF; - const stub_schema = ({ - isValid() - { - // TODO: test me - }, - } ); - - const result = new Sut( - emitter, - ts_ctr, - createMockEncoderCtor( stub_schema ), - conn, - stub_schema, - ).publish( doc_id, delta, bucket, ratedata ); + const result = new Sut( emitter, ts_ctr, conn, writer ) + .publish( meta, delta, bucket, ratedata ); return Promise.all( [ expect( result ).to.eventually.be.rejectedWith( @@ -158,276 +147,47 @@ describe( 'server.DeltaPublisher', () => } ) ] ); } ) ); - } ); - describe( '#avroEncode parses', () => - { - [ - { - label: 'Null value', - valid: true, - delta_data: { foo: null }, - }, - { - label: 'Null array', - valid: true, - delta_data: { foo: { "array": [ null ] } }, - }, - { - label: 'Boolean value', - valid: true, - delta_data: { foo: { "array": [ - { "boolean": true }, - ] } }, - }, - { - label: 'Simple string', - valid: true, - delta_data: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - ] } }, - }, - { - label: 'Simple int', - valid: true, - delta_data: { foo: { "array": [ - { "double": 123 }, - ] } }, - }, - { - label: 'Nested array', - valid: true, - delta_data: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - ] }, - ] } }, - }, - { - label: 'Array with nulls', - valid: true, - delta_data: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - null, - ] } }, - }, - { - label: 'Nested Array with mixed values', - valid: true, - delta_data: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - { "double": 123321 }, - null, - ] } - ] } }, - }, - { - label: 'Non-array', - valid: false, - delta_data: { foo: 'bar' }, - }, - { - label: 'Map objects', - valid: true, - delta_data: { "foo": { "array": [ - { "map": { - "bar": { "map": { - "baz": { "double": 1572903485000 }, - } } - } } - ] } }, - } - ].forEach( ( { label, delta_data, valid } ) => + + it( 'writer#write rejects', () => { - it( label, () => - { - const emitter = createMockEventEmitter(); - const conn = createMockAmqpConnection(); - const data = createMockData( delta_data ); + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const error = new Error( 'Bad thing happened' ); + const meta = { + id: 123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: 345, + lastUpdate: 456, + }; - const stub_schema = ({ - isValid() - { - // TODO: test me - }, - } ); + writer.write = ( + _: any, + __: any, + ___: any, + ____: any, + _____: any + ): Promise => + { + return Promise.reject( error ); + }; - const sut = new Sut( - emitter, - ts_ctr, - createMockEncoderCtor( stub_schema ), - conn, - stub_schema - ); + const result = new Sut( emitter, ts_ctr, conn, writer ) + .publish( meta, delta, bucket, ratedata ); - sut.avroEncode( data ) - .then( b => - { - expect( typeof(b) ).to.equal( 'object' ); - expect( valid ).to.be.true; - } ) - .catch( _ => - { - expect( valid ).to.be.false; - } ); - } ); - } ); - } ); - - - describe( '#setDataTypes annotates', () => - { - [ - { - label: 'Null', - delta_data: null, - expected: null, - }, - { - label: 'Null Value', - delta_data: { foo: null }, - expected: { foo: null }, - }, - { - label: 'Boolean Value', - delta_data: { foo: [ true ] }, - expected: { foo: { "array": [ - { "boolean": true }, - ] } }, - }, - { - label: 'Simple string', - delta_data: { foo: [ - 'bar', - 'baz', - ] }, - expected: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - ] } }, - }, - { - label: 'Simple int', - delta_data: { foo: [ - 123 - ] }, - expected: { foo: { "array": [ - { "double": 123 }, - ] } }, - }, - { - label: 'Nested array', - delta_data: { foo: [ - [ - 'bar', - 'baz', - ] - ] }, - expected: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - ] }, - ] } }, - }, - { - label: 'Double nested array', - delta_data: { foo: [ - [ - [ - 'bar', - 123, - null - ], - ], - ] }, - expected: { foo: { "array": [ - { "array": [ - { "array": [ - { "string": 'bar' }, - { "double": 123 }, - null, - ] }, - ] }, - ] } }, - }, - { - label: 'Array with nulls', - delta_data: { foo: [ - 'bar', - 'baz', - null - ] }, - expected: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - null - ] } }, - }, - { - label: 'Nested Array with mixed values', - delta_data: { foo: [ - [ - 'bar', - 123321, - null, - ] - ] }, - expected: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - { "double": 123321 }, - null, - ] }, - ] } }, - }, - { - label: 'Nested Array with mixed values', - delta_data: { foo: [ - { - "bar": { - "wer": 'qaz', - "qwe": 1572903485000, - "asd": true, - "zxc": null, - }, - }, - ] }, - expected: { "foo": { "array": [ - { "map": { - "bar": { "map": { - "wer": { "string": 'qaz' }, - "qwe": { "double": 1572903485000 }, - "asd": { "boolean": true }, - "zxc": null, - } }, - } }, - ] } }, - }, - ].forEach( ( { label, delta_data, expected } ) => - { - it( label, () => - { - const encoded = 'FooBar'; - const emitter = createMockEventEmitter(); - const conn = createMockAmqpConnection(); - const avroEncoderCtr = createMockEncoder( encoded ); - const stub_schema = {}; - const sut = new Sut( - emitter, - ts_ctr, - avroEncoderCtr, - conn, - stub_schema, - ); - const actual = sut.setDataTypes( delta_data ); - - expect( actual ).to.deep.equal( expected ); - } ); - } ); + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( error ), + result.catch( e => + { + return expect( e ).to.deep.equal( error ); + } ) + ] ); + } ) } ); } ); @@ -438,26 +198,6 @@ function ts_ctr(): UnixTimestamp } -function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr -{ - return ( _schema: AvroSchema ) => - { - const mock = sinon.mock( Duplex ); - - mock.on = ( _: string, __: any ) => {}; - mock.end = ( _: any ) => { return mock_encoded_data; }; - - return mock; - }; -} - - -function createMockEventEmitter(): EventEmitter -{ - return {}; -} - - function createMockAmqpConnection(): AmqpConnection { return { @@ -467,39 +207,6 @@ function createMockAmqpConnection(): AmqpConnection } -function createMockData( delta_data: any ): any -{ - - return { - event: { - id: 'RATE', - ts: 1573856916, - actor: 'SERVER', - step: null, - }, - document: { - id: 123123, - created: 1573856916, - modified: 1573856916, - top_visited_step: '2', - }, - data: null, - ratedata: null, - delta: { - Data: { - bucket: delta_data, - }, - }, - program: { - Program: { - id: 'quote_server', - version: 'dadaddwafdwa', - }, - }, - }; -} - - function createMockBucketData(): Record { return { @@ -518,26 +225,12 @@ function createMockDelta(): Delta } -function createMockEncoderCtor( stub_schema: AvroSchema ): - ( schema: AvroSchema ) => Duplex +function createMockWriter(): MessageWriter { - const events = void>>{}; - - const mock_duplex = ({ - on( event_name: string, callback: () => void ) + return { + write( _: any, __:any, ___:any, ____:any, _____:any ): Promise { - events[ event_name ] = callback; - }, - - end() - { - events.end(); - }, - } ); - - return ( schema: AvroSchema ): Duplex => - { - expect( schema ).to.equal( stub_schema ); - return mock_duplex; + return Promise.resolve( Buffer.from( '' ) ); + } }; -} +} \ No newline at end of file diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts index caab191..abfbef8 100644 --- a/test/system/EventMediatorTest.ts +++ b/test/system/EventMediatorTest.ts @@ -62,11 +62,11 @@ describe( 'system.EventLogger captures and logs events', () => expect( method_called ).to.be.true; } ); - it( 'amqp-conn-error triggers log#warning', () => + it( 'amqp-conn-warn triggers log#warning', () => { let method_called = false; - const event_id = 'amqp-conn-error'; + const event_id = 'amqp-conn-warn'; const emitter = new EventEmitter(); const log = createMockLogger(); diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts index eafc77d..9a36584 100644 --- a/test/system/MetricsCollectorTest.ts +++ b/test/system/MetricsCollectorTest.ts @@ -72,7 +72,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => const sut = new Sut( factory, conf, emitter, timer ); - emitter.emit( 'delta-process-error' ); + emitter.emit( 'error' ); expect( counter_called ).to.be.true; diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts new file mode 100644 index 0000000..271d735 --- /dev/null +++ b/test/system/V1MessageWriterTest.ts @@ -0,0 +1,532 @@ +/** + * V1 Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Tests for Version 1 of the avro message writer + */ + +import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter'; +import { hasContext, context } from '../../src/error/ContextError'; +import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory'; +import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; +import { DocumentMeta, DocumentId } from '../../src/document/Document'; +import { Duplex } from 'stream'; +import { AvroSchema } from 'avro-js'; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + +const sinon = require( 'sinon' ); + +describe( 'system.V1MessageWriter', () => +{ + it( 'Rejects improperly formatted data', () => + { + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const error = new Error( 'Oh no' ); + const schema = createMockAvroSchema(); + const ts = 123; + const meta = { + id: 123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: 345, + lastUpdate: 456, + }; + + const expected = { + invalid_paths: 'Foo', + invalid_data: 'Bar', + }; + + const error_context = context( error, expected ); + + schema.isValid = () => { throw error_context; }; + + const result = new Sut( + createMockEncoderCtor( schema ), + schema, + ).write( ts, meta, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( error ), + result.catch( e => + { + if ( !hasContext( e ) ) + { + return expect.fail(); + } + + return expect( e.context ).to.deep.equal( expected ); + } ) + ] ); + } ); + + + describe( '#avroEncode parses', () => + { + [ + { + label: 'Null value', + valid: true, + delta_data: { foo: null }, + }, + { + label: 'Null array', + valid: true, + delta_data: { foo: { 'array': [ null ] } }, + }, + { + label: 'Boolean value', + valid: true, + delta_data: { foo: { 'array': [ + { 'boolean': true }, + ] } }, + }, + { + label: 'Simple string', + valid: true, + delta_data: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } }, + }, + { + label: 'Simple int', + valid: true, + delta_data: { foo: { 'array': [ + { 'double': 123 }, + ] } }, + }, + { + label: 'Nested array', + valid: true, + delta_data: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + valid: true, + delta_data: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + null, + ] } }, + }, + { + label: 'Nested Array with mixed values', + valid: true, + delta_data: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] } + ] } }, + }, + { + label: 'Non-array', + valid: false, + delta_data: { foo: 'bar' }, + }, + { + label: 'Map objects', + valid: true, + delta_data: { 'foo': { 'array': [ + { 'map': { + 'bar': { 'map': { + 'baz': { 'double': 1572903485000 }, + } } + } } + ] } }, + } + ].forEach( ( { label, delta_data, valid } ) => + { + it( label, () => + { + const data = createMockData( delta_data ); + const schema = createMockAvroSchema(); + + const sut = new Sut( + createMockEncoderCtor( schema ), + schema + ); + + sut.avroEncode( data ) + .then( b => + { + expect( typeof(b) ).to.equal( 'object' ); + expect( valid ).to.be.true; + } ) + .catch( _ => + { + expect( valid ).to.be.false; + } ); + } ); + } ); + } ); + + + describe( '#setDataTypes annotates', () => + { + [ + { + label: 'Null', + delta_data: null, + expected: null, + }, + { + label: 'Null Value', + delta_data: { foo: null }, + expected: { foo: null }, + }, + { + label: 'Boolean Value', + delta_data: { foo: [ true ] }, + expected: { foo: { 'array': [ + { 'boolean': true }, + ] } }, + }, + { + label: 'Simple string', + delta_data: { foo: [ + 'bar', + 'baz', + ] }, + expected: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } }, + }, + { + label: 'Simple int', + delta_data: { foo: [ + 123 + ] }, + expected: { foo: { 'array': [ + { 'double': 123 }, + ] } }, + }, + { + label: 'Nested array', + delta_data: { foo: [ + [ + 'bar', + 'baz', + ] + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] }, + ] } }, + }, + { + label: 'Double nested array', + delta_data: { foo: [ + [ + [ + 'bar', + 123, + null + ], + ], + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123 }, + null, + ] }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + delta_data: { foo: [ + 'bar', + 'baz', + null + ] }, + expected: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + null + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + [ + 'bar', + 123321, + null, + ] + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] }, + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + { + 'bar': { + 'wer': 'qaz', + 'qwe': 1572903485000, + 'asd': true, + 'zxc': null, + }, + }, + ] }, + expected: { 'foo': { 'array': [ + { 'map': { + 'bar': { 'map': { + 'wer': { 'string': 'qaz' }, + 'qwe': { 'double': 1572903485000 }, + 'asd': { 'boolean': true }, + 'zxc': null, + } }, + } }, + ] } }, + }, + ].forEach( ( { label, delta_data, expected } ) => + { + it( label, () => + { + const encoded = 'FooBar'; + const avroEncoderCtr = createMockEncoder( encoded ); + const stub_schema = {}; + const sut = new Sut( + avroEncoderCtr, + stub_schema, + ); + const actual = sut.setDataTypes( delta_data ); + + expect( actual ).to.deep.equal( expected ); + } ); + } ); + } ); + + + it( 'Message is formatted correctly', () => + { + const bucket = { foo: [ 'bar', 'baz' ] }; + const ratedata = {}; + const doc_id = 123; + const entity_name = 'Some Agency'; + const entity_id = 123; + const startDate = 345; + const lastUpdate = 456; + const schema = createMockAvroSchema(); + const ts = 123; + const encoder = createMockEncoderCtor( schema ); + const meta = { + id: doc_id, + entity_name: entity_name, + entity_id: entity_id, + startDate: startDate, + lastUpdate: lastUpdate, + }; + + const delta = >{ + type: 'data', + timestamp: 123123123, + data: >{}, + }; + + const expected = { + event: { + id: 'STEP_SAVE', + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: doc_id, + created: startDate, + modified: lastUpdate, + }, + session: { + Session: { + entity_name: entity_name, + entity_id: entity_id, + }, + }, + data: { + Data: { + bucket: { + 'foo': { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } + }, + }, + }, + ratedata: { + Data: { + bucket: {}, + }, + }, + delta: { + Data: { + bucket: delta.data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: '', + }, + }, + }; + + let is_valid_called = false; + + schema.isValid = ( data: Record, _:any ) => + { + expect( data ).to.deep.equal( expected ); + + is_valid_called = true; + + return null; + } + + return expect( new Sut( encoder, schema ) + .write( ts, meta, delta, bucket, ratedata ) ) + .to.eventually.deep.equal( Buffer.from( '' ) ) + .then( _ => + { + expect( is_valid_called ).to.be.true; + } ) + } ); +} ); + + +function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr +{ + return ( _schema: AvroSchema ) => + { + const mock = sinon.mock( Duplex ); + + mock.on = ( _: string, __: any ) => {}; + mock.end = ( _: any ) => { return mock_encoded_data; }; + + return mock; + }; +} + + +function createMockData( delta_data: any ): any +{ + + return { + event: { + id: 'RATE', + ts: 1573856916, + actor: 'SERVER', + step: null, + }, + document: { + id: 123123, + created: 1573856916, + modified: 1573856916, + top_visited_step: '2', + }, + data: null, + ratedata: null, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', + }, + }, + }; +} + + +function createMockDelta(): Delta +{ + return >{ + type: 'data', + timestamp: 123123123, + data: >{}, + } +} + + +function createMockBucketData(): Record +{ + return { + foo: [ 'bar', 'baz' ] + } +} + + +function createMockEncoderCtor( stub_schema: AvroSchema ): + ( schema: AvroSchema ) => Duplex +{ + const events = void>>{}; + + const mock_duplex = ({ + on( event_name: string, callback: () => void ) + { + events[ event_name ] = callback; + }, + + end() + { + events.end(); + }, + } ); + + return ( schema: AvroSchema ): Duplex => + { + expect( schema ).to.equal( stub_schema ); + return mock_duplex; + }; +} + + +function createMockAvroSchema(): AvroSchema +{ + return { + toBuffer() { return null }, + isValid() { return null }, + encode() {}, + toString() { return '' }, + fromBuffer() { return {} }, + }; +} diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts index 36a332c..1e4237d 100644 --- a/test/system/amqp/AmqpConnectionTest.ts +++ b/test/system/amqp/AmqpConnectionTest.ts @@ -41,7 +41,7 @@ describe( 'AmqpConnection', () => assertExchange() { return Promise.reject( expected_err ); }, - }); + } ); const mock_connection = ({ once() {}, @@ -49,13 +49,13 @@ describe( 'AmqpConnection', () => createChannel() { return Promise.resolve( mock_channel ); }, - }); + } ); const mock_amqp = ({ connect() { return Promise.resolve( mock_connection ); } - }); + } ); const emitter = new EventEmitter(); const conf = {}; @@ -65,5 +65,48 @@ describe( 'AmqpConnection', () => .to.eventually.be.rejectedWith( expected_err ); } ); } ); + + + describe( '#reconnect', () => + { + it( "is called when there is an error with the connection", () => + { + let reconnect_called = false; + + const mock_channel = ({ + assertExchange() { + return Promise.resolve(); + }, + } ); + + const mock_connection = Object.create( + new EventEmitter() + ); + + mock_connection.createChannel = (): any => { + return Promise.resolve( mock_channel ); + }; + + const mock_amqp = ({ + connect() { + return Promise.resolve( mock_connection ); + } + } ); + + const emitter = new EventEmitter(); + + emitter.on( 'amqp-reconnect', () => { reconnect_called = true } ); + + const conf = {}; + const sut = new Sut( mock_amqp, conf, emitter ); + + const result = sut.connect() + .then( () => mock_connection.emit( 'error' ) ) + + return expect( result ) + .to.eventually.deep.equal( true ) + .then( _ => expect( reconnect_called ).to.be.true ); + } ); + } ); } );