1
0
Fork 0

[DEV-5312] Define document meta data and restructure message writer.

Add better tests for message writer
master
Austin Schaffer 2019-12-10 17:24:19 -05:00
parent 9eb1f3afca
commit e3dded760d
24 changed files with 1303 additions and 806 deletions

12
.env
View File

@ -1,15 +1,17 @@
NODE_ENV=dev
AMQP_HOSTNAME=localhost
AMQP_HOST=localhost
AMQP_PORT=5672
AMQP_USERNAME=
AMQP_PASSWORD=
AMQP_USER=
AMQP_PASS=
AMQP_FRAMEMAX=0
AMQP_HEARTBEAT=2
AMQP_VHOST=
AMQP_EXCHANGE=
AMQP_RETRIES=30
AMQP_RETRY_WAIT=1
PROM_HOSTNAME=
PROM_HOST=
PROM_PORT=9091
PROM_PUSH_INTERVAL_MS=5000
PROM_BUCKETS_START=0
PROM_BUCKETS_WIDTH=10
PROM_BUCKETS_COUNT=10
PROCESS_INTERVAL_MS=2000

View File

@ -18,13 +18,14 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import * as amqplib from "amqplib";
import * as amqplib from 'amqplib';
import { createAmqpConfig } from '../src/system/AmqpPublisher';
import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
import { DeltaProcessor } from '../src/system/DeltaProcessor';
import { DeltaPublisher } from '../src/system/DeltaPublisher';
import { MongoCollection } from '../src/types/mongodb';
import { createAvroEncoder } from '../src/system/avro/AvroFactory';
import { V1MessageWriter } from '../src/system/avro/V1MessageWriter';
import {
createMongoConfig,
createMongoDB,
@ -39,8 +40,9 @@ import {
createPrometheusConfig,
} from '../src/system/PrometheusFactory';
import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
import { parse as avro_parse } from "avro-js";
import { parse as avro_parse } from 'avro-js';
require('dotenv-flow').config();
const amqp_conf = createAmqpConfig( process.env );
const prom_conf = createPrometheusConfig( process.env );
@ -51,12 +53,17 @@ const env = process.env.NODE_ENV || 'Unknown Environment';
const emitter = new EventEmitter();
const log = new StandardLogger( console, ts_ctr, env );
const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter );
const publisher = new DeltaPublisher(
const message_writer = new V1MessageWriter(
createAvroEncoder,
avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
);
const publisher = new DeltaPublisher(
emitter,
ts_ctr,
createAvroEncoder,
amqp_connection,
avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
message_writer,
);
// Prometheus Metrics

View File

@ -24,9 +24,10 @@
},
"dependencies": {
"easejs": "0.2.x",
"mongodb": "1.2.14",
"amqplib": "0.5.3"
"easejs": "0.2.x",
"mongodb": "1.2.14",
"dotenv-flow": "3.1.0",
"amqplib": "0.5.3"
},
"devDependencies": {
"typescript": "~3.7",

View File

@ -81,6 +81,15 @@ export interface DeltaDocument
/** The document id */
id: DocumentId,
/** The entity name */
agentName: string,
/** The entity id */
agentEntityId: number,
/** The time the document was created */
startDate: UnixTimestamp,
/** The time the document was updated */
lastUpdate: UnixTimestamp,

View File

@ -18,7 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* The term "Quote" is synonymous with "Document"; this project is moving
* The term 'Quote' is synonymous with 'Document'; this project is moving
* more toward the latter as it is further generalized.
*/
@ -31,7 +31,29 @@ export type DocumentId = NominalType<number, 'DocumentId'>;
/**
* Quote (Document) id
*
* Where the term "Quote" is still used, this will allow for type
* Where the term 'Quote' is still used, this will allow for type
* compatibility and an easy transition.
*/
export type QuoteId = DocumentId;
/**
* Document meta data
*/
export type DocumentMeta =
{
/** The document id */
id: DocumentId,
/** The entity name */
entity_name: string,
/** The entity id */
entity_id: number,
/** The time the document was created */
startDate: UnixTimestamp,
/** The time the document was updated */
lastUpdate: UnixTimestamp,
}

View File

@ -309,7 +309,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
save_data.id = id;
save_data.pver = quote.getProgramVersion();
save_data.importDirty = 1;
save_data.published = 0;
save_data.published = false;
save_data.lastPremDate = quote.getLastPremiumDate();
save_data.initialRatedDate = quote.getRatedDate();
save_data.explicitLock = quote.getExplicitLockReason();

View File

@ -21,8 +21,8 @@
* Publish Amqp message to a queue
*/
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from '../document/Document';
import { DeltaResult } from '../bucket/delta';
import { DocumentMeta } from '../document/Document';
import { Options } from 'amqplib';
@ -37,10 +37,10 @@ export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig
{
return <AmqpConfig>{
protocol: 'amqp',
hostname: env.AMQP_HOSTNAME,
hostname: env.AMQP_HOST,
port: +( env.AMQP_PORT || 0 ),
username: env.AMQP_USERNAME,
password: env.AMQP_PASSWORD,
username: env.AMQP_USER,
password: env.AMQP_PASS,
locale: 'en_US',
frameMax: +( env.AMQP_FRAMEMAX || 0 ),
heartbeat: +( env.AMQP_HEARTBEAT || 0 ),
@ -52,8 +52,9 @@ export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig
}
export interface AmqpConfig extends Options.Connect {
/** The protocol to connect with (should always be "amqp") */
export interface AmqpConfig extends Options.Connect
{
/** The protocol to connect with (should always be 'amqp') */
protocol: string;
/** The hostname to connect to */
@ -68,7 +69,7 @@ export interface AmqpConfig extends Options.Connect {
/** A password if one if required */
password?: string;
/** Locale (should always be "en_US") */
/** Locale (should always be 'en_US') */
locale: string;
/** The size in bytes of the maximum frame allowed */
@ -96,13 +97,13 @@ export interface AmqpPublisher
/**
* Publish quote message to exchange post-rating
*
* @param doc_id - The doc_id
* @param delta - The delta
* @param bucket - The bucket
* @param ratedata - The rate data bucket
* @param meta - document meta data
* @param delta - delta
* @param bucket - bucket
* @param ratedata - rate data bucket
*/
publish(
doc_id: DocumentId,
meta: DocumentMeta,
delta: DeltaResult<any>,
bucket: Record<string, any>,
ratedata?: Record<string, any>,

View File

@ -19,17 +19,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { DeltaDao } from "../system/db/DeltaDao";
import { DocumentId } from "../document/Document";
import { AmqpPublisher } from "./AmqpPublisher";
import { EventEmitter } from "events";
import { DeltaDao } from '../system/db/DeltaDao';
import { DocumentMeta } from '../document/Document';
import { AmqpPublisher } from './AmqpPublisher';
import { EventEmitter } from 'events';
import {
DeltaType,
applyDelta,
DeltaDocument,
Delta,
ReverseDelta,
} from "../bucket/delta";
} from '../bucket/delta';
/** Deltas and state of data prior to their application */
type DeltaState = [
@ -77,6 +77,11 @@ export class DeltaProcessor
}
/**
* Process the next document
*
* @param docs - list of documents to process
*/
private _processNext( docs: DeltaDocument[] ): Promise<void>
{
const doc = docs.shift();
@ -91,28 +96,38 @@ export class DeltaProcessor
}
/**
* Process an individual document
*
* @param doc - individual document to process
*/
private _processDocument( doc: DeltaDocument ): Promise<void>
{
const deltas = this._getTimestampSortedDeltas( doc );
const doc_id = doc.id;
const bucket = doc.data;
const ratedata = doc.ratedata || {};
const last_updated_ts = doc.lastUpdate;
const meta = {
id: doc.id,
entity_name: doc.agentName,
entity_id: +doc.agentEntityId,
startDate: doc.startDate,
lastUpdate: doc.lastUpdate,
};
const history = this._applyDeltas( deltas, bucket, ratedata );
return this._processNextDelta( doc_id, history )
return this._processNextDelta( meta, history )
.then( _ =>
this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
this._dao.markDocumentAsProcessed( meta.id, meta.lastUpdate )
)
.then( _ =>
{
this._emitter.emit( 'document-processed', { doc_id: doc_id } );
this._emitter.emit( 'document-processed', { doc_id: meta.id } );
} )
.catch( e =>
.catch( ( e: Error ) =>
{
this._emitter.emit( 'error', e );
return this._dao.setErrorFlag( doc_id );
return this._dao.setErrorFlag( meta.id );
} );
}
@ -179,8 +194,14 @@ export class DeltaProcessor
}
/**
* Process the next delta from the history
*
* @param meta - document meta data
* @param history - a history of deltas and their buckets (data, ratedata)
*/
private _processNextDelta(
doc_id: DocumentId,
meta: DocumentMeta,
history: DeltaState[],
): Promise<void>
{
@ -191,17 +212,14 @@ export class DeltaProcessor
const [ delta, bucket, ratedata ] = history[ 0 ];
const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type;
const delta_uid = meta.id + '_' + delta.timestamp + '_' + delta.type;
this._emitter.emit( 'delta-process-start', delta_uid );
return this._publisher.publish( doc_id, delta, bucket, ratedata )
.then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) )
return this._publisher.publish( meta, delta, bucket, ratedata )
.then( _ => this._dao.advanceDeltaIndex( meta.id, delta.type ) )
.then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
.then( _ => this._processNextDelta(
doc_id,
history.slice( 1 ),
) );
.then( _ => this._processNextDelta( meta, history.slice( 1 ) ) );
}

View File

@ -23,82 +23,43 @@
import { AmqpPublisher } from './AmqpPublisher';
import { Delta } from '../bucket/delta';
import { EventEmitter } from "events";
import { DocumentId } from '../document/Document';
import { EventEmitter } from 'events';
import { DocumentMeta } from '../document/Document';
import { context } from '../error/ContextError';
import { AmqpError } from '../error/AmqpError';
import { AvroEncoderCtr } from './avro/AvroFactory';
import { MessageWriter } from './MessageWriter';
import { AmqpConnection } from './amqp/AmqpConnection';
import { AvroSchema } from "avro-js";
export class DeltaPublisher implements AmqpPublisher
{
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
data: 'STEP_SAVE',
ratedata: 'RATE',
};
/**
* Delta publisher
*
* @param _emitter - event emitter instance
* @param _ts_ctr - a timestamp constructor
* @param _encoder_ctr - a factory function to create an avro encoder
* @param _conn - the amqp connection
* @param _emitter - event emitter instance
* @param _ts_ctr - a timestamp constructor
* @param _conn - the amqp connection
* @param _writer - message writer
*/
constructor(
private readonly _emitter: EventEmitter,
private readonly _ts_ctr: () => UnixTimestamp,
private readonly _encoder_ctor: AvroEncoderCtr,
private readonly _conn: AmqpConnection,
private readonly _schema: AvroSchema,
private readonly _emitter: EventEmitter,
private readonly _ts_ctr: () => UnixTimestamp,
private readonly _conn: AmqpConnection,
private readonly _writer: MessageWriter,
) {}
/**
* Publish quote message to exchange post-rating
*
* @param doc_id - The doc_id
* @param delta - The delta
* @param bucket - The bucket
* @param ratedata - The ratedata bucket
* @param meta - document meta data
* @param delta - delta
* @param bucket - bucket
* @param ratedata - rate data bucket
*/
publish(
doc_id: DocumentId,
delta: Delta<any>,
bucket: Record<string, any>,
ratedata: Record<string, any> = {},
): Promise<void>
{
return this._sendMessage( doc_id, delta, bucket, ratedata )
.then( _ =>
{
this._emitter.emit(
'delta-publish',
{
delta: delta,
exchange: this._conn.getExchangeName(),
}
);
} );
}
/**
* Send message to exchange
*
* @param doc_id - The doc_id
* @param delta - The delta to publish
* @param bucket - The bucket
* @param ratedata - The ratedata bucket
*
* @return whether publish was successful
*/
private _sendMessage(
doc_id: DocumentId,
publish(
meta: DocumentMeta,
delta: Delta<any>,
bucket: Record<string, any>,
ratedata: Record<string, any>,
@ -106,16 +67,14 @@ export class DeltaPublisher implements AmqpPublisher
{
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };
const avro_object = this._avroFormat(
return this._writer.write(
ts,
doc_id,
meta,
delta,
bucket,
ratedata,
);
return this.avroEncode( avro_object )
.then( ( avro_buffer ) =>
ratedata
).then( ( avro_buffer: Buffer ) =>
{
const channel = this._conn.getAmqpChannel();
@ -124,7 +83,7 @@ export class DeltaPublisher implements AmqpPublisher
throw context(
new AmqpError( 'Error sending message: No channel' ),
{
doc_id: doc_id,
doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp,
},
@ -144,193 +103,22 @@ export class DeltaPublisher implements AmqpPublisher
throw context(
new Error ( 'Delta publish failed' ),
{
doc_id: doc_id,
doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp,
}
);
}
} )
.then( ( _: any ) =>
{
this._emitter.emit(
'delta-publish',
{
delta: delta,
exchange: this._conn.getExchangeName(),
}
);
} );
}
/**
* Throw an error with specific information if the schema is invalid
*
* @param schema - Avro schema
* @param data - Data to encode
*/
private _assertValidAvro(
schema: AvroSchema,
data: Record<string, any>,
): void
{
schema.isValid( data, { errorHook: hook } );
function hook( keys: any, vals: any) {
throw context( new Error( 'Invalid Avro Schema' ),
{
invalid_paths: keys,
invalid_data: vals,
}
);
}
}
/**
* Format the avro data with data type labels
*
* @param ts - a timestamp
* @param doc_id - the document id
* @param delta - the current delta
* @param bucket - the data bucket
* @param ratedata - the ratedata bucket
*
* @return the formatted data
*/
private _avroFormat(
ts: UnixTimestamp,
doc_id: DocumentId,
delta: Delta<any>,
bucket: Record<string, any>,
ratedata: Record<string, any>,
): any
{
const delta_formatted = this.setDataTypes( delta.data );
const bucket_formatted = this.setDataTypes( bucket );
const ratedata_formatted = this.setDataTypes( ratedata );
const event_id = this.DELTA_MAP[ delta.type ];
return {
event: {
id: event_id,
ts: ts,
actor: 'SERVER',
step: null,
},
document: {
id: doc_id
},
data: {
Data: {
bucket: bucket_formatted,
},
},
ratedata: {
Data: {
bucket: ratedata_formatted,
},
},
delta: {
Data: {
bucket: delta_formatted,
},
},
program: {
Program: {
id: 'quote_server',
version: '',
},
},
}
}
/**
* Encode the data in an avro buffer
*
* @param data - the data to encode
*
* @return the avro buffer or null if there is an error
*/
avroEncode( data: Record<string, any> ): Promise<Buffer>
{
return new Promise<Buffer>( ( resolve, reject ) =>
{
const bufs: Buffer[] = [];
try
{
this._assertValidAvro( this._schema, data )
const encoder = this._encoder_ctor( this._schema )
encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
encoder.on('error', ( err: Error ) => { reject( err ); } )
encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } )
encoder.end( data );
}
catch ( e )
{
reject( e );
}
} );
}
/**
* Format the data for avro by add type specifications to the data
*
* @param data - the data to format
*
* @return the formatted data
*/
setDataTypes( data: any, top_level: boolean = true ): any
{
let data_formatted: any = {};
switch( typeof( data ) )
{
case 'object':
if ( data == null )
{
return null;
}
else if ( Array.isArray( data ) )
{
let arr: any[] = [];
data.forEach( ( datum ) =>
{
arr.push( this.setDataTypes( datum, false ) );
} );
data_formatted = ( top_level )
? arr
: { 'array': arr };
}
else
{
let datum_formatted: any = {};
Object.keys( data).forEach( ( key: string ) =>
{
const datum = this.setDataTypes( data[ key ], false );
datum_formatted[ key ] = datum;
} );
data_formatted = ( top_level )
? datum_formatted
: { "map": datum_formatted };
}
break;
case 'boolean':
return { 'boolean': data };
case 'number':
return { 'double': data };
case 'string':
return { 'string': data };
case 'undefined':
return null;
}
return data_formatted;
}
}

View File

@ -48,7 +48,7 @@ export class EventMediator
msg
) );
this._emitter.on( 'amqp-conn-error', ( msg ) =>
this._emitter.on( 'amqp-conn-warn', ( msg ) =>
this._log.warning( 'AMQP Connection Error', msg ) );
this._emitter.on( 'amqp-reconnect', () =>
@ -68,6 +68,11 @@ export class EventMediator
}
/**
* Handle an error event
*
* @param e - any
*/
private _handleError( e: any ): void
{
let msg: string = '';

View File

@ -0,0 +1,44 @@
/**
* Message Writer
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Write a message to be published to a queue
*/
import { DocumentMeta } from '../document/Document';
import { DeltaResult } from '../bucket/delta';
export interface MessageWriter
{
/**
* Write the data to a message
*
* @param ts - timestamp
* @param meta - document meta data
* @param delta - current delta
* @param bucket - data bucket
* @param ratedata - ratedata bucket
*/
write(
ts: UnixTimestamp,
meta: DocumentMeta,
delta: DeltaResult<any>,
bucket: Record<string, any>,
ratedata: Record<string, any>,
): Promise<Buffer>
}

View File

@ -22,7 +22,7 @@
*/
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
import { EventEmitter } from "events";
import { EventEmitter } from 'events';
import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory';
const client = require( 'prom-client' )
@ -72,6 +72,7 @@ export class MetricsCollector
* @param _factory - A factory to create prometheus components
* @param _conf - Prometheus configuration
* @param _emitter - Event emitter
* @param _timer - A timer function to create a tuple timestamp
*/
constructor(
private readonly _factory: PrometheusFactory,
@ -96,9 +97,9 @@ export class MetricsCollector
client,
this._process_time_name,
this._process_time_help,
0,
10,
10,
this._conf.buckets_start,
this._conf.buckets_width,
this._conf.buckets_count,
);
this._total_error = this._factory.createCounter(
@ -123,7 +124,8 @@ export class MetricsCollector
this._push_interval = setInterval( () =>
{
this._gateway.pushAdd(
{ jobName: 'liza_delta_metrics' }, this.pushCallback
{ jobName: 'liza_delta_metrics' },
this.getPushCallback( this )
);
}, this._conf.push_interval_ms
);
@ -145,7 +147,7 @@ export class MetricsCollector
/**
* List to events to update metrics
*/
private hookMetrics()
private hookMetrics(): void
{
this._emitter.on(
'delta-process-start',
@ -165,35 +167,36 @@ export class MetricsCollector
}
);
this._emitter.on(
'delta-process-error',
( _ ) => this._total_error.inc()
);
this._emitter.on( 'error', ( _ ) => this._total_error.inc() );
}
/**
* Handle push error
*
* @param error - Any errors that occurred
* @param response - The http response
* @param body - The resposne body
* @param self - Metrics Collector object
*
* @return a function to handle the pushAdd callback
*/
private pushCallback(
error?: Error | undefined,
_response?: any,
_body?: any
): void
private getPushCallback( self: MetricsCollector ): () => void
{
if ( error )
return (
error?: Error | undefined,
_response?: any,
_body?: any
): void =>
{
this._emitter.emit( 'error', error );
if ( error )
{
self._emitter.emit( 'error', error );
}
}
}
/**
* Update metrics with current error count
*
* @param count - the number of errors found
*/
updateErrorCount( count: number ): void
{

View File

@ -35,6 +35,15 @@ export declare type PrometheusConfig = {
/** The rate (in milliseconds) at which metrics are pushed */
push_interval_ms: number;
/** The starting point for process time buckets */
buckets_start: number;
/** The width of process time buckets */
buckets_width: number;
/** The number of process time buckets */
buckets_count: number;
}
@ -50,10 +59,13 @@ export function createPrometheusConfig(
): PrometheusConfig
{
return <PrometheusConfig>{
'hostname': env.prom_hostname,
'port': +( env.prom_port || 0 ),
'env': process.env.NODE_ENV,
'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ),
hostname: env.PROM_HOST,
port: +( env.PROM_PORT || 0 ),
env: process.env.NODE_ENV,
push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 5000 ),
buckets_start: +( process.env.PROM_BUCKETS_START || 0 ),
buckets_width: +( process.env.PROM_BUCKETS_WIDTH || 10 ),
buckets_count: +( process.env.PROM_BUCKETS_COUNT || 10 ),
};
}
@ -63,8 +75,9 @@ export class PrometheusFactory
/**
* Create a PushGateway
*
* @param client - prometheus client
* @param url - the url of the push gateway
* @param client - prometheus client
* @param hostname - push gateway url
* @param port - push gateway port
*
* @return the gateway
*/

View File

@ -160,6 +160,7 @@ export class StandardLogger implements PsrLogger
*
* @param msg - the string or object to log
* @param level - the log level
* @param context - additional message context
*
* @returns a structured logging object
*/

View File

@ -24,8 +24,6 @@ import * as amqplib from "amqplib";
/**
* Connection to AMQP exchange
*
* XXX: Needs tests!
*/
export class AmqpConnection
{
@ -39,6 +37,7 @@ export class AmqpConnection
/**
* Amqp Connection
*
* @param _conf - amqp library
* @param _conf - amqp configuration
* @param _emitter - event emitter instance
*/
@ -65,7 +64,7 @@ export class AmqpConnection
*/
this._conn.once( 'error', e =>
{
this._emitter.emit( 'amqp-conn-error', e );
this._emitter.emit( 'amqp-conn-warn', e );
this._reconnect();
} );

View File

@ -0,0 +1,259 @@
/**
* Message Writer
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Write a message to be published to a queue
*/
import { DocumentMeta } from '../../document/Document';
import { Delta } from '../../bucket/delta';
import { AvroEncoderCtr } from '../avro/AvroFactory';
import { AvroSchema } from 'avro-js';
import { MessageWriter } from '../MessageWriter';
import { context } from '../../error/ContextError';
export class V1MessageWriter implements MessageWriter
{
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
data: 'STEP_SAVE',
ratedata: 'RATE',
};
/**
* Delta publisher
*
* @param _encoder_ctr - a factory function to create an avro encoder
* @param _conn - the amqp connection
*/
constructor(
private readonly _encoder_ctor: AvroEncoderCtr,
private readonly _schema: AvroSchema,
) {}
/**
* Write the data to a message
*
* @param ts - timestamp
* @param meta - document meta data
* @param delta - current delta
* @param bucket - data bucket
* @param ratedata - ratedata bucket
*/
write(
ts: UnixTimestamp,
meta: DocumentMeta,
delta: Delta<any>,
bucket: Record<string, any>,
ratedata: Record<string, any>,
): Promise<Buffer>
{
const avro_object = this._avroFormat(
ts,
meta,
delta,
bucket,
ratedata,
);
return this.avroEncode( avro_object );
}
/**
* Format the avro data with data type labels
*
* @param ts - timestamp
* @param meta - document meta data
* @param delta - current delta
* @param bucket - data bucket
* @param ratedata - ratedata bucket
*
* @return the formatted data
*/
private _avroFormat(
ts: UnixTimestamp,
meta: DocumentMeta,
delta: Delta<any>,
bucket: Record<string, any>,
ratedata: Record<string, any>,
): any
{
const delta_formatted = this.setDataTypes( delta.data );
const bucket_formatted = this.setDataTypes( bucket );
const ratedata_formatted = this.setDataTypes( ratedata );
const event_id = this.DELTA_MAP[ delta.type ];
return {
event: {
id: event_id,
ts: ts,
actor: 'SERVER',
step: null,
},
document: {
id: meta.id,
created: meta.startDate,
modified: meta.lastUpdate,
},
session: {
Session: {
entity_id: meta.entity_id,
entity_name: meta.entity_name,
},
},
data: {
Data: {
bucket: bucket_formatted,
},
},
ratedata: {
Data: {
bucket: ratedata_formatted,
},
},
delta: {
Data: {
bucket: delta_formatted,
},
},
program: {
Program: {
id: 'quote_server',
version: '',
},
},
}
}
/**
* Encode the data in an avro buffer
*
* @param data - the data to encode
*
* @return the avro buffer or null if there is an error
*/
avroEncode( data: Record<string, any> ): Promise<Buffer>
{
return new Promise<Buffer>( ( resolve, reject ) =>
{
const bufs: Buffer[] = [];
try
{
this._schema.isValid(
data,
{
errorHook: ( keys: any, vals: any) =>
{
throw context(
new Error( 'Invalid Avro Schema' ),
{
invalid_paths: keys,
invalid_data: vals,
}
);
}
}
);
const encoder = this._encoder_ctor( this._schema )
encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
encoder.on('error', ( err: Error ) => { reject( err ); } )
encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } )
encoder.end( data );
}
catch ( e )
{
reject( e );
}
} );
}
/**
* Format the data for avro by add type specifications to the data
*
* @param data - the data to format
* @param top_level - whether we are at the top level of the recursion
*
* @return the formatted data
*/
setDataTypes( data: any, top_level: boolean = true ): any
{
let data_formatted: any = {};
switch( typeof( data ) )
{
case 'object':
if ( data == null )
{
return null;
}
else if ( Array.isArray( data ) )
{
let arr: any[] = [];
data.forEach( ( datum ) =>
{
arr.push( this.setDataTypes( datum, false ) );
} );
data_formatted = ( top_level )
? arr
: { 'array': arr };
}
else
{
let datum_formatted: any = {};
Object.keys( data).forEach( ( key: string ) =>
{
const datum = this.setDataTypes( data[ key ], false );
datum_formatted[ key ] = datum;
} );
data_formatted = ( top_level )
? datum_formatted
: { 'map': datum_formatted };
}
break;
case 'boolean':
return { 'boolean': data };
case 'number':
return { 'double': data };
case 'string':
return { 'string': data };
case 'undefined':
return null;
}
return data_formatted;
}
}

View File

@ -34,7 +34,7 @@
},
{
"name": "step",
"type":[
"type": [
"null",
{
"type": "record",
@ -77,6 +77,26 @@
]
}
},
{
"name": "session",
"type": [
"null",
{
"type": "record",
"name": "Session",
"fields": [
{
"name": "entity_name",
"type": "string"
},
{
"name": "entity_id",
"type": "int"
}
]
}
]
},
{
"name": "data",
"type": [

View File

@ -40,6 +40,9 @@ export class MongoDeltaDao implements DeltaDao
/** The document fields to read */
readonly RESULT_FIELDS: Record<string, number> = {
id: 1,
agentName: 1,
agentEntityId: 1,
startDate: 1,
lastUpdate: 1,
data: 1,
ratedata: 1,
@ -70,7 +73,7 @@ export class MongoDeltaDao implements DeltaDao
this._collection.find(
{
published: false,
deltaError: false,
deltaError: { $ne: true },
},
{ fields: this.RESULT_FIELDS },
( e, cursor ) =>

View File

@ -22,7 +22,7 @@
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao';
import { DeltaDocument } from "../../src/bucket/delta";
import { DeltaDocument } from '../../src/bucket/delta';
import { DocumentId } from '../../src/document/Document';
import { EventEmitter } from 'events';
@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () =>
expected: any
}[]>[
{
label: "No deltas are processed",
label: 'No deltas are processed',
given: [
{
id: 123,
@ -53,9 +53,9 @@ describe( 'system.DeltaProcessor', () =>
expected: [],
},
// when quote is initialized: { foo: [ "" ], state: [ "a" ] }
// when quote is initialized: { foo: [ '' ], state: [ 'a' ] }
{
label: "Publishes deltas in order",
label: 'Publishes deltas in order',
given: [
{
@ -63,13 +63,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 123123123,
data: {
foo: [ "third" ],
state: [ "a", "b", "c", "d" ],
foo: [ 'third' ],
state: [ 'a', 'b', 'c', 'd' ],
},
ratedata: {
prem: [ "rate_second" ],
state: [ "i", "ii", "iii" ],
prem: [ 'rate_second' ],
state: [ 'i', 'ii', 'iii' ],
},
rdelta: {
@ -77,21 +77,21 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 1,
data: {
foo: [ "" ],
foo: [ '' ],
state: [ undefined, null ],
},
},
{
timestamp: 3,
data: {
foo: [ "first" ],
foo: [ 'first' ],
state: [ undefined, undefined, null ],
},
},
{
timestamp: 5,
data: {
foo: [ "second" ],
foo: [ 'second' ],
state: [ undefined, undefined, undefined, null ],
},
},
@ -101,14 +101,14 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 2,
data: {
prem: [ "" ],
prem: [ '' ],
state: [ undefined, null ],
},
},
{
timestamp: 4,
data: {
prem: [ "rate_first" ],
prem: [ 'rate_first' ],
state: [ undefined, undefined, null ],
},
},
@ -122,12 +122,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
foo: [ "" ],
foo: [ '' ],
state: [ undefined, null ],
},
bucket: {
foo: [ "first" ],
state: [ "a", "b" ],
foo: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {},
},
@ -136,16 +136,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
prem: [ "" ],
prem: [ '' ],
state: [ undefined, null ],
},
bucket: {
foo: [ "first" ],
state: [ "a", "b" ],
foo: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {
prem: [ "rate_first" ],
state: [ "i", "ii" ],
prem: [ 'rate_first' ],
state: [ 'i', 'ii' ],
},
},
@ -153,12 +153,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
foo: [ "first" ],
foo: [ 'first' ],
state: [ undefined, undefined, null ],
},
bucket: {
foo: [ "second" ],
state: [ "a", "b", "c" ],
foo: [ 'second' ],
state: [ 'a', 'b', 'c' ],
},
ratedata: {},
},
@ -167,16 +167,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
prem: [ "rate_first" ],
prem: [ 'rate_first' ],
state: [ undefined, undefined, null ],
},
bucket: {
foo: [ "second" ],
state: [ "a", "b", "c" ],
foo: [ 'second' ],
state: [ 'a', 'b', 'c' ],
},
ratedata: {
prem: [ "rate_second" ],
state: [ "i", "ii", "iii" ],
prem: [ 'rate_second' ],
state: [ 'i', 'ii', 'iii' ],
},
},
@ -184,12 +184,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
foo: [ "second" ],
foo: [ 'second' ],
state: [ undefined, undefined, undefined, null ],
},
bucket: {
foo: [ "third" ],
state: [ "a", "b", "c", "d" ],
foo: [ 'third' ],
state: [ 'a', 'b', 'c', 'd' ],
},
ratedata: {},
},
@ -197,7 +197,7 @@ describe( 'system.DeltaProcessor', () =>
},
{
label: "Publishes deltas in order for multiple documents",
label: 'Publishes deltas in order for multiple documents',
given: [
{
@ -205,13 +205,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 123123123,
data: {
foo: [ "first" ],
state: [ "a", "b" ],
foo: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {
prem: [ "rate_first" ],
state: [ "i", "ii" ],
prem: [ 'rate_first' ],
state: [ 'i', 'ii' ],
},
rdelta: {
@ -219,7 +219,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 1,
data: {
foo: [ "" ],
foo: [ '' ],
state: [ undefined, null ],
},
},
@ -229,7 +229,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 4,
data: {
prem: [ "" ],
prem: [ '' ],
state: [ undefined, null ],
},
},
@ -245,13 +245,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 121212123,
data: {
foo2: [ "first" ],
state: [ "a", "b" ],
foo2: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {
prem2: [ "rate_first" ],
state: [ "i", "ii" ],
prem2: [ 'rate_first' ],
state: [ 'i', 'ii' ],
},
rdelta: {
@ -259,7 +259,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 2,
data: {
foo2: [ "" ],
foo2: [ '' ],
state: [ undefined, null ],
},
},
@ -269,7 +269,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 3,
data: {
prem2: [ "" ],
prem2: [ '' ],
state: [ undefined, null ],
},
},
@ -283,12 +283,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
foo: [ "" ],
foo: [ '' ],
state: [ undefined, null ],
},
bucket: {
foo: [ "first" ],
state: [ "a", "b" ],
foo: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {},
},
@ -297,16 +297,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
prem: [ "" ],
prem: [ '' ],
state: [ undefined, null ],
},
bucket: {
foo: [ "first" ],
state: [ "a", "b" ],
foo: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {
prem: [ "rate_first" ],
state: [ "i", "ii" ],
prem: [ 'rate_first' ],
state: [ 'i', 'ii' ],
},
},
@ -314,12 +314,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 234,
rdelta: {
foo2: [ "" ],
foo2: [ '' ],
state: [ undefined, null ],
},
bucket: {
foo2: [ "first" ],
state: [ "a", "b" ],
foo2: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {},
},
@ -328,37 +328,37 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 234,
rdelta: {
prem2: [ "" ],
prem2: [ '' ],
state: [ undefined, null ],
},
bucket: {
foo2: [ "first" ],
state: [ "a", "b" ],
foo2: [ 'first' ],
state: [ 'a', 'b' ],
},
ratedata: {
prem2: [ "rate_first" ],
state: [ "i", "ii" ],
prem2: [ 'rate_first' ],
state: [ 'i', 'ii' ],
},
},
],
},
{
label: "trims delta array based on index",
label: 'trims delta array based on index',
given: [
{
id: 111,
lastUpdate: 123123123,
data: { foo: [ "second" ] },
data: { foo: [ 'second' ] },
ratedata: {},
rdelta: {
data: [
{
data: { foo: [ "" ] },
data: { foo: [ '' ] },
timestamp: 123,
},
{
data: { foo: [ "first" ] },
data: { foo: [ 'first' ] },
timestamp: 234,
},
],
@ -371,8 +371,8 @@ describe( 'system.DeltaProcessor', () =>
expected: [
{
doc_id: 111,
rdelta: { foo: [ "first" ] },
bucket: { foo: [ "second" ] },
rdelta: { foo: [ 'first' ] },
bucket: { foo: [ 'second' ] },
ratedata: {}
},
],
@ -390,14 +390,14 @@ describe( 'system.DeltaProcessor', () =>
}
publisher.publish = (
doc_id,
meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
doc_id: doc_id,
doc_id: meta.id,
rdelta: delta.data,
bucket: bucket,
ratedata: ratedata,
@ -422,12 +422,19 @@ describe( 'system.DeltaProcessor', () =>
const dao = createMockDeltaDao();
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
const entity_num = 'Some Agency';
const entity_id = 4321;
const lastUpdate = <UnixTimestamp>123123123;
const createdData = <UnixTimestamp>234234234;
const doc = <DeltaDocument[]>[ {
id: <DocumentId>123,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
id: <DocumentId>123,
agentName: entity_num,
agentEntityId: entity_id,
startDate: createdData,
lastUpdate: lastUpdate,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@ -439,11 +446,14 @@ describe( 'system.DeltaProcessor', () =>
},
},
{
id: <DocumentId>234,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
id: <DocumentId>234,
agentName: entity_num,
agentEntityId: entity_id,
startDate: createdData,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@ -457,13 +467,25 @@ describe( 'system.DeltaProcessor', () =>
const expected_published = [
{
doc_id: 123,
meta: {
entity_id: 4321,
entity_name: 'Some Agency',
id: 123,
lastUpdate: 123123123,
startDate: 234234234,
},
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
},
{
doc_id: 234,
meta: {
entity_id: 4321,
entity_name: 'Some Agency',
id: 234,
lastUpdate: 123123123,
startDate: 234234234,
},
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
@ -485,14 +507,14 @@ describe( 'system.DeltaProcessor', () =>
}
publisher.publish = (
doc_id,
meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
doc_id: doc_id,
meta: meta,
delta: delta.data,
bucket: bucket,
ratedata: ratedata,
@ -525,11 +547,14 @@ describe( 'system.DeltaProcessor', () =>
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
const doc = <DeltaDocument[]>[ {
id: <DocumentId>123,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
id: <DocumentId>123,
agentName: 'Some Agency',
agentEntityId: 4321,
startDate: <UnixTimestamp>234234234,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@ -541,11 +566,14 @@ describe( 'system.DeltaProcessor', () =>
},
},
{
id: <DocumentId>234,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
id: <DocumentId>234,
agentName: 'Some Agency',
agentEntityId: 4321,
startDate: <UnixTimestamp>234234234,
lastUpdate: <UnixTimestamp>123123123,
data: { foo: [ 'start_bar' ] },
ratedata: {},
rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@ -559,7 +587,13 @@ describe( 'system.DeltaProcessor', () =>
// Only one is published
const expected_published = [ {
doc_id: 123,
meta: {
entity_id: 4321,
entity_name: 'Some Agency',
id: 123,
lastUpdate: 123123123,
startDate: 234234234,
},
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
@ -577,14 +611,14 @@ describe( 'system.DeltaProcessor', () =>
Promise.reject( new Error( expected_error ) );
publisher.publish = (
doc_id,
meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
doc_id: doc_id,
meta,
delta: delta.data,
bucket: bucket,
ratedata: ratedata,

View File

@ -22,20 +22,17 @@
import { AmqpConnection } from '../../src/system/amqp/AmqpConnection';
import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
import { DocumentId } from '../../src/document/Document';
import { Duplex } from 'stream';
import { EventEmitter } from "events";
import { DocumentId, DocumentMeta } from '../../src/document/Document';
import { EventEmitter } from 'events';
import { hasContext } from '../../src/error/ContextError';
import { AmqpError } from '../../src/error/AmqpError';
import { Channel } from 'amqplib';
import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
import { MessageWriter } from '../../src/system/MessageWriter';
import { AvroSchema } from "avro-js";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
const sinon = require( 'sinon' );
describe( 'server.DeltaPublisher', () =>
{
@ -49,6 +46,15 @@ describe( 'server.DeltaPublisher', () =>
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
const writer = createMockWriter();
const meta = <DocumentMeta>{
id: <DocumentId>123,
entity_name: 'Some Agency',
entity_id: 234,
startDate: <UnixTimestamp>345,
lastUpdate: <UnixTimestamp>456,
};
conn.getAmqpChannel = () =>
{
return <Channel>{
@ -63,23 +69,10 @@ describe( 'server.DeltaPublisher', () =>
};
};
const stub_schema = <AvroSchema>(<unknown>{
isValid()
{
// TODO: test me
},
} );
const sut = new Sut(
emitter,
ts_ctr,
createMockEncoderCtor( stub_schema ),
conn,
stub_schema,
);
const sut = new Sut( emitter, ts_ctr, conn, writer );
return expect(
sut.publish( <DocumentId>123, delta, bucket, ratedata )
sut.publish( meta, delta, bucket, ratedata )
).to.eventually.deep.equal( undefined )
.then( _ =>
{
@ -119,29 +112,25 @@ describe( 'server.DeltaPublisher', () =>
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
const doc_id = <DocumentId>123;
const writer = createMockWriter();
const meta = <DocumentMeta>{
id: <DocumentId>123,
entity_name: 'Some Agency',
entity_id: 234,
startDate: <UnixTimestamp>345,
lastUpdate: <UnixTimestamp>456,
};
const expected = {
doc_id: doc_id,
doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp
}
conn.getAmqpChannel = getChannelF;
const stub_schema = <AvroSchema>(<unknown>{
isValid()
{
// TODO: test me
},
} );
const result = new Sut(
emitter,
ts_ctr,
createMockEncoderCtor( stub_schema ),
conn,
stub_schema,
).publish( doc_id, delta, bucket, ratedata );
const result = new Sut( emitter, ts_ctr, conn, writer )
.publish( meta, delta, bucket, ratedata );
return Promise.all( [
expect( result ).to.eventually.be.rejectedWith(
@ -158,276 +147,47 @@ describe( 'server.DeltaPublisher', () =>
} )
] );
} ) );
} );
describe( '#avroEncode parses', () =>
{
[
{
label: 'Null value',
valid: true,
delta_data: { foo: null },
},
{
label: 'Null array',
valid: true,
delta_data: { foo: { "array": [ null ] } },
},
{
label: 'Boolean value',
valid: true,
delta_data: { foo: { "array": [
{ "boolean": true },
] } },
},
{
label: 'Simple string',
valid: true,
delta_data: { foo: { "array": [
{ "string": 'bar' },
{ "string": 'baz' },
] } },
},
{
label: 'Simple int',
valid: true,
delta_data: { foo: { "array": [
{ "double": 123 },
] } },
},
{
label: 'Nested array',
valid: true,
delta_data: { foo: { "array": [
{ "array": [
{ "string": 'bar' },
] },
] } },
},
{
label: 'Array with nulls',
valid: true,
delta_data: { foo: { "array": [
{ "string": 'bar' },
{ "string": 'baz' },
null,
] } },
},
{
label: 'Nested Array with mixed values',
valid: true,
delta_data: { foo: { "array": [
{ "array": [
{ "string": 'bar' },
{ "double": 123321 },
null,
] }
] } },
},
{
label: 'Non-array',
valid: false,
delta_data: { foo: 'bar' },
},
{
label: 'Map objects',
valid: true,
delta_data: { "foo": { "array": [
{ "map": {
"bar": { "map": {
"baz": { "double": 1572903485000 },
} }
} }
] } },
}
].forEach( ( { label, delta_data, valid } ) =>
it( 'writer#write rejects', () =>
{
it( label, () =>
{
const emitter = createMockEventEmitter();
const conn = createMockAmqpConnection();
const data = createMockData( delta_data );
const delta = createMockDelta();
const bucket = createMockBucketData();
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
const writer = createMockWriter();
const error = new Error( 'Bad thing happened' );
const meta = <DocumentMeta>{
id: <DocumentId>123,
entity_name: 'Some Agency',
entity_id: 234,
startDate: <UnixTimestamp>345,
lastUpdate: <UnixTimestamp>456,
};
const stub_schema = <AvroSchema>(<unknown>{
isValid()
{
// TODO: test me
},
} );
writer.write = (
_: any,
__: any,
___: any,
____: any,
_____: any
): Promise<Buffer> =>
{
return Promise.reject( error );
};
const sut = new Sut(
emitter,
ts_ctr,
createMockEncoderCtor( stub_schema ),
conn,
stub_schema
);
const result = new Sut( emitter, ts_ctr, conn, writer )
.publish( meta, delta, bucket, ratedata );
sut.avroEncode( data )
.then( b =>
{
expect( typeof(b) ).to.equal( 'object' );
expect( valid ).to.be.true;
} )
.catch( _ =>
{
expect( valid ).to.be.false;
} );
} );
} );
} );
describe( '#setDataTypes annotates', () =>
{
[
{
label: 'Null',
delta_data: null,
expected: null,
},
{
label: 'Null Value',
delta_data: { foo: null },
expected: { foo: null },
},
{
label: 'Boolean Value',
delta_data: { foo: [ true ] },
expected: { foo: { "array": [
{ "boolean": true },
] } },
},
{
label: 'Simple string',
delta_data: { foo: [
'bar',
'baz',
] },
expected: { foo: { "array": [
{ "string": 'bar' },
{ "string": 'baz' },
] } },
},
{
label: 'Simple int',
delta_data: { foo: [
123
] },
expected: { foo: { "array": [
{ "double": 123 },
] } },
},
{
label: 'Nested array',
delta_data: { foo: [
[
'bar',
'baz',
]
] },
expected: { foo: { "array": [
{ "array": [
{ "string": 'bar' },
{ "string": 'baz' },
] },
] } },
},
{
label: 'Double nested array',
delta_data: { foo: [
[
[
'bar',
123,
null
],
],
] },
expected: { foo: { "array": [
{ "array": [
{ "array": [
{ "string": 'bar' },
{ "double": 123 },
null,
] },
] },
] } },
},
{
label: 'Array with nulls',
delta_data: { foo: [
'bar',
'baz',
null
] },
expected: { foo: { "array": [
{ "string": 'bar' },
{ "string": 'baz' },
null
] } },
},
{
label: 'Nested Array with mixed values',
delta_data: { foo: [
[
'bar',
123321,
null,
]
] },
expected: { foo: { "array": [
{ "array": [
{ "string": 'bar' },
{ "double": 123321 },
null,
] },
] } },
},
{
label: 'Nested Array with mixed values',
delta_data: { foo: [
{
"bar": {
"wer": 'qaz',
"qwe": 1572903485000,
"asd": true,
"zxc": null,
},
},
] },
expected: { "foo": { "array": [
{ "map": {
"bar": { "map": {
"wer": { "string": 'qaz' },
"qwe": { "double": 1572903485000 },
"asd": { "boolean": true },
"zxc": null,
} },
} },
] } },
},
].forEach( ( { label, delta_data, expected } ) =>
{
it( label, () =>
{
const encoded = 'FooBar';
const emitter = createMockEventEmitter();
const conn = createMockAmqpConnection();
const avroEncoderCtr = createMockEncoder( encoded );
const stub_schema = <AvroSchema>{};
const sut = new Sut(
emitter,
ts_ctr,
avroEncoderCtr,
conn,
stub_schema,
);
const actual = sut.setDataTypes( delta_data );
expect( actual ).to.deep.equal( expected );
} );
} );
return Promise.all( [
expect( result ).to.eventually.be.rejectedWith( error ),
result.catch( e =>
{
return expect( e ).to.deep.equal( error );
} )
] );
} )
} );
} );
@ -438,26 +198,6 @@ function ts_ctr(): UnixTimestamp
}
function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
{
return ( _schema: AvroSchema ) =>
{
const mock = sinon.mock( Duplex );
mock.on = ( _: string, __: any ) => {};
mock.end = ( _: any ) => { return mock_encoded_data; };
return mock;
};
}
function createMockEventEmitter(): EventEmitter
{
return <EventEmitter>{};
}
function createMockAmqpConnection(): AmqpConnection
{
return <AmqpConnection>{
@ -467,39 +207,6 @@ function createMockAmqpConnection(): AmqpConnection
}
function createMockData( delta_data: any ): any
{
return {
event: {
id: 'RATE',
ts: 1573856916,
actor: 'SERVER',
step: null,
},
document: {
id: 123123,
created: 1573856916,
modified: 1573856916,
top_visited_step: '2',
},
data: null,
ratedata: null,
delta: {
Data: {
bucket: delta_data,
},
},
program: {
Program: {
id: 'quote_server',
version: 'dadaddwafdwa',
},
},
};
}
function createMockBucketData(): Record<string, any>
{
return {
@ -518,26 +225,12 @@ function createMockDelta(): Delta<any>
}
function createMockEncoderCtor( stub_schema: AvroSchema ):
( schema: AvroSchema ) => Duplex
function createMockWriter(): MessageWriter
{
const events = <Record<string, () => void>>{};
const mock_duplex = <Duplex>(<unknown>{
on( event_name: string, callback: () => void )
return <MessageWriter>{
write( _: any, __:any, ___:any, ____:any, _____:any ): Promise<Buffer>
{
events[ event_name ] = callback;
},
end()
{
events.end();
},
} );
return ( schema: AvroSchema ): Duplex =>
{
expect( schema ).to.equal( stub_schema );
return mock_duplex;
return Promise.resolve( Buffer.from( '' ) );
}
};
}
}

View File

@ -62,11 +62,11 @@ describe( 'system.EventLogger captures and logs events', () =>
expect( method_called ).to.be.true;
} );
it( 'amqp-conn-error triggers log#warning', () =>
it( 'amqp-conn-warn triggers log#warning', () =>
{
let method_called = false;
const event_id = 'amqp-conn-error';
const event_id = 'amqp-conn-warn';
const emitter = new EventEmitter();
const log = createMockLogger();

View File

@ -72,7 +72,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
const sut = new Sut( factory, conf, emitter, timer );
emitter.emit( 'delta-process-error' );
emitter.emit( 'error' );
expect( counter_called ).to.be.true;

View File

@ -0,0 +1,532 @@
/**
* V1 Message Writer
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Tests for Version 1 of the avro message writer
*/
import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter';
import { hasContext, context } from '../../src/error/ContextError';
import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
import { DocumentMeta, DocumentId } from '../../src/document/Document';
import { Duplex } from 'stream';
import { AvroSchema } from 'avro-js';
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
const sinon = require( 'sinon' );
describe( 'system.V1MessageWriter', () =>
{
it( 'Rejects improperly formatted data', () =>
{
const delta = createMockDelta();
const bucket = createMockBucketData();
const ratedata = createMockBucketData();
const error = new Error( 'Oh no' );
const schema = createMockAvroSchema();
const ts = <UnixTimestamp>123;
const meta = <DocumentMeta>{
id: <DocumentId>123,
entity_name: 'Some Agency',
entity_id: 234,
startDate: <UnixTimestamp>345,
lastUpdate: <UnixTimestamp>456,
};
const expected = {
invalid_paths: 'Foo',
invalid_data: 'Bar',
};
const error_context = context( error, expected );
schema.isValid = () => { throw error_context; };
const result = new Sut(
createMockEncoderCtor( schema ),
schema,
).write( ts, meta, delta, bucket, ratedata );
return Promise.all( [
expect( result ).to.eventually.be.rejectedWith( error ),
result.catch( e =>
{
if ( !hasContext( e ) )
{
return expect.fail();
}
return expect( e.context ).to.deep.equal( expected );
} )
] );
} );
describe( '#avroEncode parses', () =>
{
[
{
label: 'Null value',
valid: true,
delta_data: { foo: null },
},
{
label: 'Null array',
valid: true,
delta_data: { foo: { 'array': [ null ] } },
},
{
label: 'Boolean value',
valid: true,
delta_data: { foo: { 'array': [
{ 'boolean': true },
] } },
},
{
label: 'Simple string',
valid: true,
delta_data: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
] } },
},
{
label: 'Simple int',
valid: true,
delta_data: { foo: { 'array': [
{ 'double': 123 },
] } },
},
{
label: 'Nested array',
valid: true,
delta_data: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
] },
] } },
},
{
label: 'Array with nulls',
valid: true,
delta_data: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
null,
] } },
},
{
label: 'Nested Array with mixed values',
valid: true,
delta_data: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'double': 123321 },
null,
] }
] } },
},
{
label: 'Non-array',
valid: false,
delta_data: { foo: 'bar' },
},
{
label: 'Map objects',
valid: true,
delta_data: { 'foo': { 'array': [
{ 'map': {
'bar': { 'map': {
'baz': { 'double': 1572903485000 },
} }
} }
] } },
}
].forEach( ( { label, delta_data, valid } ) =>
{
it( label, () =>
{
const data = createMockData( delta_data );
const schema = createMockAvroSchema();
const sut = new Sut(
createMockEncoderCtor( schema ),
schema
);
sut.avroEncode( data )
.then( b =>
{
expect( typeof(b) ).to.equal( 'object' );
expect( valid ).to.be.true;
} )
.catch( _ =>
{
expect( valid ).to.be.false;
} );
} );
} );
} );
describe( '#setDataTypes annotates', () =>
{
[
{
label: 'Null',
delta_data: null,
expected: null,
},
{
label: 'Null Value',
delta_data: { foo: null },
expected: { foo: null },
},
{
label: 'Boolean Value',
delta_data: { foo: [ true ] },
expected: { foo: { 'array': [
{ 'boolean': true },
] } },
},
{
label: 'Simple string',
delta_data: { foo: [
'bar',
'baz',
] },
expected: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
] } },
},
{
label: 'Simple int',
delta_data: { foo: [
123
] },
expected: { foo: { 'array': [
{ 'double': 123 },
] } },
},
{
label: 'Nested array',
delta_data: { foo: [
[
'bar',
'baz',
]
] },
expected: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
] },
] } },
},
{
label: 'Double nested array',
delta_data: { foo: [
[
[
'bar',
123,
null
],
],
] },
expected: { foo: { 'array': [
{ 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'double': 123 },
null,
] },
] },
] } },
},
{
label: 'Array with nulls',
delta_data: { foo: [
'bar',
'baz',
null
] },
expected: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
null
] } },
},
{
label: 'Nested Array with mixed values',
delta_data: { foo: [
[
'bar',
123321,
null,
]
] },
expected: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'double': 123321 },
null,
] },
] } },
},
{
label: 'Nested Array with mixed values',
delta_data: { foo: [
{
'bar': {
'wer': 'qaz',
'qwe': 1572903485000,
'asd': true,
'zxc': null,
},
},
] },
expected: { 'foo': { 'array': [
{ 'map': {
'bar': { 'map': {
'wer': { 'string': 'qaz' },
'qwe': { 'double': 1572903485000 },
'asd': { 'boolean': true },
'zxc': null,
} },
} },
] } },
},
].forEach( ( { label, delta_data, expected } ) =>
{
it( label, () =>
{
const encoded = 'FooBar';
const avroEncoderCtr = createMockEncoder( encoded );
const stub_schema = <AvroSchema>{};
const sut = new Sut(
avroEncoderCtr,
stub_schema,
);
const actual = sut.setDataTypes( delta_data );
expect( actual ).to.deep.equal( expected );
} );
} );
} );
it( 'Message is formatted correctly', () =>
{
const bucket = { foo: [ 'bar', 'baz' ] };
const ratedata = {};
const doc_id = <DocumentId>123;
const entity_name = 'Some Agency';
const entity_id = 123;
const startDate = <UnixTimestamp>345;
const lastUpdate = <UnixTimestamp>456;
const schema = createMockAvroSchema();
const ts = <UnixTimestamp>123;
const encoder = createMockEncoderCtor( schema );
const meta = <DocumentMeta>{
id: doc_id,
entity_name: entity_name,
entity_id: entity_id,
startDate: startDate,
lastUpdate: lastUpdate,
};
const delta = <Delta<any>>{
type: <DeltaType>'data',
timestamp: <UnixTimestamp>123123123,
data: <DeltaResult<any>>{},
};
const expected = {
event: {
id: 'STEP_SAVE',
ts: ts,
actor: 'SERVER',
step: null,
},
document: {
id: doc_id,
created: startDate,
modified: lastUpdate,
},
session: {
Session: {
entity_name: entity_name,
entity_id: entity_id,
},
},
data: {
Data: {
bucket: {
'foo': { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
] }
},
},
},
ratedata: {
Data: {
bucket: {},
},
},
delta: {
Data: {
bucket: delta.data,
},
},
program: {
Program: {
id: 'quote_server',
version: '',
},
},
};
let is_valid_called = false;
schema.isValid = ( data: Record<string, any>, _:any ) =>
{
expect( data ).to.deep.equal( expected );
is_valid_called = true;
return null;
}
return expect( new Sut( encoder, schema )
.write( ts, meta, delta, bucket, ratedata ) )
.to.eventually.deep.equal( Buffer.from( '' ) )
.then( _ =>
{
expect( is_valid_called ).to.be.true;
} )
} );
} );
function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
{
return ( _schema: AvroSchema ) =>
{
const mock = sinon.mock( Duplex );
mock.on = ( _: string, __: any ) => {};
mock.end = ( _: any ) => { return mock_encoded_data; };
return mock;
};
}
function createMockData( delta_data: any ): any
{
return {
event: {
id: 'RATE',
ts: 1573856916,
actor: 'SERVER',
step: null,
},
document: {
id: 123123,
created: 1573856916,
modified: 1573856916,
top_visited_step: '2',
},
data: null,
ratedata: null,
delta: {
Data: {
bucket: delta_data,
},
},
program: {
Program: {
id: 'quote_server',
version: 'dadaddwafdwa',
},
},
};
}
function createMockDelta(): Delta<any>
{
return <Delta<any>>{
type: <DeltaType>'data',
timestamp: <UnixTimestamp>123123123,
data: <DeltaResult<any>>{},
}
}
function createMockBucketData(): Record<string, any>
{
return {
foo: [ 'bar', 'baz' ]
}
}
function createMockEncoderCtor( stub_schema: AvroSchema ):
( schema: AvroSchema ) => Duplex
{
const events = <Record<string, () => void>>{};
const mock_duplex = <Duplex>(<unknown>{
on( event_name: string, callback: () => void )
{
events[ event_name ] = callback;
},
end()
{
events.end();
},
} );
return ( schema: AvroSchema ): Duplex =>
{
expect( schema ).to.equal( stub_schema );
return mock_duplex;
};
}
function createMockAvroSchema(): AvroSchema
{
return <AvroSchema>{
toBuffer() { return null },
isValid() { return null },
encode() {},
toString() { return '' },
fromBuffer() { return {} },
};
}

View File

@ -41,7 +41,7 @@ describe( 'AmqpConnection', () =>
assertExchange() {
return Promise.reject( expected_err );
},
});
} );
const mock_connection = <amqplib.Connection>(<unknown>{
once() {},
@ -49,13 +49,13 @@ describe( 'AmqpConnection', () =>
createChannel() {
return Promise.resolve( mock_channel );
},
});
} );
const mock_amqp = <typeof amqplib>(<unknown>{
connect() {
return Promise.resolve( mock_connection );
}
});
} );
const emitter = new EventEmitter();
const conf = <AmqpConfig>{};
@ -65,5 +65,48 @@ describe( 'AmqpConnection', () =>
.to.eventually.be.rejectedWith( expected_err );
} );
} );
describe( '#reconnect', () =>
{
it( "is called when there is an error with the connection", () =>
{
let reconnect_called = false;
const mock_channel = <amqplib.Channel>(<unknown>{
assertExchange() {
return Promise.resolve();
},
} );
const mock_connection = <amqplib.Connection>Object.create(
new EventEmitter()
);
mock_connection.createChannel = (): any => {
return Promise.resolve( mock_channel );
};
const mock_amqp = <typeof amqplib>(<unknown>{
connect() {
return Promise.resolve( mock_connection );
}
} );
const emitter = new EventEmitter();
emitter.on( 'amqp-reconnect', () => { reconnect_called = true } );
const conf = <AmqpConfig>{};
const sut = new Sut( mock_amqp, conf, emitter );
const result = sut.connect()
.then( () => mock_connection.emit( 'error' ) )
return expect( result )
.to.eventually.deep.equal( true )
.then( _ => expect( reconnect_called ).to.be.true );
} );
} );
} );