diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 3fd6cb7..e77dd6b 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -48,7 +48,7 @@ const amqp_conf = createAmqpConfig( process.env ); const prom_conf = createPrometheusConfig( process.env ); const db_conf = createMongoConfig( process.env ); const db = createMongoDB( db_conf ); -const process_interval_ms = +( process.env.process_interval_ms || 10000 ); +const process_interval_ms = +( process.env.PROCESS_INTERVAL_MS || 10000 ); const env = process.env.NODE_ENV || 'Unknown Environment'; const emitter = new EventEmitter(); const log = new StandardLogger( console, ts_ctr, env ); diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index 7c5dd6a..bc48dd1 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -81,6 +81,9 @@ export interface DeltaDocument /** The document id */ id: DocumentId, + /** The source program */ + programId: string, + /** The entity name */ agentName: string, diff --git a/src/document/Document.ts b/src/document/Document.ts index 8f05bac..5dd3aef 100644 --- a/src/document/Document.ts +++ b/src/document/Document.ts @@ -45,6 +45,9 @@ export type DocumentMeta = /** The document id */ id: DocumentId, + /** The source program */ + program: string, + /** The entity name */ entity_name: string, diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 9553a44..b9d792f 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -22,7 +22,7 @@ import { DeltaDao } from '../system/db/DeltaDao'; import { DocumentMeta } from '../document/Document'; import { AmqpPublisher } from './AmqpPublisher'; -import { context } from '../error/ContextError'; +import { context, hasContext } from '../error/ContextError'; import { EventEmitter } from 'events'; import { DeltaType, @@ -109,6 +109,7 @@ export class DeltaProcessor const ratedata = doc.ratedata || {}; const meta = { id: doc.id, + program: doc.programId, entity_name: doc.agentName, entity_id: +doc.agentEntityId, startDate: doc.startDate, @@ -133,15 +134,23 @@ export class DeltaProcessor } ) .catch( ( e: Error ) => { - const context_error = context( - e, - { - doc_id: meta.id, - quote_id: meta.id, - }, - ); + if ( hasContext( e ) ) + { + const combined_context: Record = {}; + const error_context = e.context; - this._emitter.emit( 'error', context_error ); + Object.keys( error_context ).forEach( ( key: string ) => + { + combined_context[ key ] = error_context[ key ]; + } ); + + combined_context[ 'doc_id' ] = meta.id; + combined_context[ 'quote_id' ] = meta.id; + + e = context( e, combined_context ); + } + + this._emitter.emit( 'error', e ); return this._dao.setErrorFlag( meta.id ); } ); } diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index ddb5877..93a81ac 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -119,7 +119,8 @@ export class DeltaPublisher implements AmqpPublisher { doc_id: meta.id, quote_id: meta.id, - delta: delta, + type: delta.type, + delta_ts: delta.timestamp, exchange: this._conn.getExchangeName(), } ); diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts index 216ed25..c6f491e 100644 --- a/src/system/avro/V1MessageWriter.ts +++ b/src/system/avro/V1MessageWriter.ts @@ -139,7 +139,7 @@ export class V1MessageWriter implements MessageWriter }, program: { Program: { - id: 'quote_server', + id: meta.program, version: '', }, }, @@ -196,12 +196,12 @@ export class V1MessageWriter implements MessageWriter /** * Format the data for avro by add type specifications to the data * - * @param data - the data to format - * @param depth - recursion depth + * @param data - the data to format + * @param top_level - whether we are at the top level of the recursion * * @return the formatted data */ - setDataTypes( data: any, depth: number = 0 ): any + setDataTypes( data: any, top_level: boolean = true ): any { let data_formatted: any = {}; @@ -210,7 +210,7 @@ export class V1MessageWriter implements MessageWriter case 'object': if ( data == null ) { - data_formatted = null; + return null; } else if ( Array.isArray( data ) ) { @@ -218,10 +218,10 @@ export class V1MessageWriter implements MessageWriter data.forEach( ( datum ) => { - arr.push( this.setDataTypes( datum, depth + 1 ) ); + arr.push( this.setDataTypes( datum, false ) ); } ); - data_formatted = ( depth < 1 ) + data_formatted = ( top_level ) ? arr : { 'array': arr }; } @@ -231,36 +231,38 @@ export class V1MessageWriter implements MessageWriter Object.keys( data).forEach( ( key: string ) => { - const datum = this.setDataTypes( data[ key ], depth + 1 ); + // Do not include "private" keys + if ( key.startsWith( '__' ) ) + { + return; + } + + const datum = this.setDataTypes( data[ key ], false ); datum_formatted[ key ] = datum; } ); - data_formatted = ( depth < 1 ) + data_formatted = ( top_level ) ? datum_formatted : { 'map': datum_formatted }; } break; case 'boolean': - return { 'bucket': { 'map': { 'boolean': data } } }; + return { 'boolean': data }; case 'number': - return { 'bucket': { 'map': { 'double': data } } }; + return { 'double': data }; case 'string': - return { 'bucket': { 'map': { 'string': data } } }; + return { 'string': data }; case 'undefined': - return { 'bucket': { 'map': null } }; - } - - if ( depth > 1 ) - { - return { 'bucket': { 'map': data_formatted } }; + return null; } return data_formatted; } + } \ No newline at end of file diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc index d19d881..63bbc7d 100644 --- a/src/system/avro/schema.avsc +++ b/src/system/avro/schema.avsc @@ -123,16 +123,50 @@ "type": "map", "values": [ "null", - "boolean", - "double", - "string", { "type": "array", - "items": "Data" - }, - { - "type": "map", - "values": "Data" + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + }, + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string", + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + } + ] } ] } diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index f3507d8..b01ddc7 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -40,6 +40,7 @@ export class MongoDeltaDao implements DeltaDao /** The document fields to read */ readonly RESULT_FIELDS: Record = { id: 1, + programId: 1, agentName: 1, agentEntityId: 1, startDate: 1, diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index 1587bb2..84f955a 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -428,6 +428,7 @@ describe( 'system.DeltaProcessor', () => const createdData = 234234234; const doc = [ { id: 123, + programId: 'mega', agentName: entity_num, agentEntityId: entity_id, startDate: createdData, @@ -447,6 +448,7 @@ describe( 'system.DeltaProcessor', () => }, { id: 234, + programId: 'mega', agentName: entity_num, agentEntityId: entity_id, startDate: createdData, @@ -471,6 +473,7 @@ describe( 'system.DeltaProcessor', () => entity_id: 4321, entity_name: 'Some Agency', id: 123, + program: 'mega', lastUpdate: 123123123, startDate: 234234234, }, @@ -483,6 +486,7 @@ describe( 'system.DeltaProcessor', () => entity_id: 4321, entity_name: 'Some Agency', id: 234, + program: 'mega', lastUpdate: 123123123, startDate: 234234234, }, @@ -548,6 +552,7 @@ describe( 'system.DeltaProcessor', () => const emitter = new EventEmitter(); const doc = [ { id: 123, + programId: 'mega', agentName: 'Some Agency', agentEntityId: 4321, startDate: 234234234, @@ -567,6 +572,7 @@ describe( 'system.DeltaProcessor', () => }, { id: 234, + programId: 'mega', agentName: 'Some Agency', agentEntityId: 4321, startDate: 234234234, @@ -591,6 +597,7 @@ describe( 'system.DeltaProcessor', () => entity_id: 4321, entity_name: 'Some Agency', id: 123, + program: 'mega', lastUpdate: 123123123, startDate: 234234234, }, diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts index 95b7771..a0931c0 100644 --- a/test/system/V1MessageWriterTest.ts +++ b/test/system/V1MessageWriterTest.ts @@ -95,59 +95,57 @@ describe( 'system.V1MessageWriter', () => { label: 'Null array', valid: true, - delta_data: { foo: { 'array': [ - { 'bucket': { 'map': null } } - ] } }, + delta_data: { foo: { 'array': [ null ] } }, }, { label: 'Boolean value', valid: true, delta_data: { foo: { 'array': [ - { 'bucket': { 'map': { 'boolean': true } } }, + { 'boolean': true }, ] } }, }, { label: 'Simple string', valid: true, delta_data: { foo: { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'string': 'baz' } } }, + { 'string': 'bar' }, + { 'string': 'baz' }, ] } }, }, { label: 'Simple int', valid: true, delta_data: { foo: { 'array': [ - { 'bucket': { 'map': { 'double': 123 } } }, + { 'double': 123 }, ] } }, }, { label: 'Nested array', valid: true, delta_data: { foo: { 'array': [ - { 'bucket': { 'map': { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - ] } } }, + { 'array': [ + { 'string': 'bar' }, + ] }, ] } }, }, { label: 'Array with nulls', valid: true, delta_data: { foo: { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'string': 'baz' } } }, - { 'bucket': { 'map': null } }, + { 'string': 'bar' }, + { 'string': 'baz' }, + null, ] } }, }, { label: 'Nested Array with mixed values', valid: true, delta_data: { foo: { 'array': [ - { 'bucket': { 'map': { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'double': 123321 } } }, - { 'bucket': { 'map': null } }, - ] } } } + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] } ] } }, }, { @@ -168,180 +166,14 @@ describe( 'system.V1MessageWriter', () => label: 'Map objects', valid: true, delta_data: { 'foo': { 'array': [ - { 'bucket': { 'map': { 'map': { 'bar': - { 'bucket': { 'map': { 'map': { 'baz': - { 'bucket': { 'map': { 'double': 1572903485000 } } } - } } } } - } } } } + { 'map': { + 'bar': { 'map': { + 'baz': { 'double': 1572903485000 }, + } } + } } ] } }, - }, - { - label: 'Arbitrary array/map depth', - valid: true, - delta_data: { - "a": { "array": [ - { "bucket": { "map": { "map": { - "b": { "bucket": { "map": { "array": [ - { "bucket": { "map": { - "string": "c" - } } }, - { "bucket": { "map": { "array": [ - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "d" - } } }, - { "bucket": { "map": { - "map": { - "e": { "bucket": { "map": { "string": "f" } } }, - "g": { "bucket": { "map": { "string": "h" } } }, - "i": { "bucket": { "map": { "string": "j" } } }, - "k": { "bucket": { "map": { "string": "l" } } }, - "m": { "bucket": { "map": { "string": "n" } } } - } - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "o" - } } }, - { "bucket": { "map": { - "map": { - "p": { "bucket": { "map": { - "string": "q" - } } }, - "r": { "bucket": { "map": { - "string": "s" - } } }, - "t": { "bucket": { "map": { - "string": "u" - } } } - } - } } }, - { "bucket": { "map": { "array": [] } } }, - { "bucket": { "map": null } } - ] - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "v" - } } }, - { "bucket": { "map": { - "map": { - "w": { "bucket": { "map": { "string": "x" } } }, - "y": { "bucket": { "map": { "string": "z" } } } - } - } } }, - { - "bucket": { - "map": { - "array": [ - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "aa" - } } }, - { "bucket": { "map": { - "map": { - "ab": { - "bucket": { - "map": { - "string": "ac" - } - } - }, - "ad": { - "bucket": { - "map": { - "string": "ae" - } - } - }, - "af": { - "bucket": { - "map": { - "string": "ag" - } - } - }, - "ah": { - "bucket": { - "map": { - "string": "ai" - } - } - } - } - } } }, - { "bucket": { "map": { - "array": [] - } } }, - { "bucket": { "map": null } } - ] - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "aj" - } } }, - { "bucket": { "map": { - "map": { - "ak": { - "bucket": { "map": { - "string": "al" - } } - }, - "am": { - "bucket": { "map": { - "string": "an" - } } - }, - "ao": { - "bucket": { "map": { - "string": "ap" - } } - } - } - } } }, - { "bucket": { "map": { - "array": [] - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "q" - } } } - ] - } } } - ] - } } } - ] - } - } - }, - { "bucket": { "map": { - "string": "" - } } } - ] - } } } - ] - } } }, - { "bucket": { "map": { - "string": "" - } } } - ] - } } } - ] } } }, - { "bucket": { "map": null } }, - { "bucket": { "map": { "boolean": false } } } - ] } } } - } } } } - ] } - }, - }, + } + ].forEach( ( { label, delta_data, valid, expected } ) => { it( label, () => @@ -408,7 +240,7 @@ describe( 'system.V1MessageWriter', () => label: 'Boolean Value', delta_data: { foo: [ true ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'boolean': true } } }, + { 'boolean': true }, ] } }, }, { @@ -418,8 +250,8 @@ describe( 'system.V1MessageWriter', () => 'baz', ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'string': 'baz' } } }, + { 'string': 'bar' }, + { 'string': 'baz' }, ] } }, }, { @@ -428,7 +260,7 @@ describe( 'system.V1MessageWriter', () => 123 ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'double': 123 } } }, + { 'double': 123 }, ] } }, }, { @@ -440,10 +272,10 @@ describe( 'system.V1MessageWriter', () => ] ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'string': 'baz' } } }, - ] } } }, + { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] }, ] } }, }, { @@ -458,13 +290,13 @@ describe( 'system.V1MessageWriter', () => ], ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'array': [ - { 'bucket': { 'map': { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'double': 123 } } }, - { 'bucket': { 'map': null } }, - ] } } }, - ] } } }, + { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123 }, + null, + ] }, + ] }, ] } }, }, { @@ -475,13 +307,13 @@ describe( 'system.V1MessageWriter', () => null ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'string': 'baz' } } }, - { 'bucket': { 'map': null } }, + { 'string': 'bar' }, + { 'string': 'baz' }, + null ] } }, }, { - label: 'Nested array with mixed values', + label: 'Nested Array with mixed values', delta_data: { foo: [ [ 'bar', @@ -490,15 +322,15 @@ describe( 'system.V1MessageWriter', () => ] ] }, expected: { foo: { 'array': [ - { 'bucket': { 'map': { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'double': 123321 } } }, - { 'bucket': { 'map': null } }, - ] } } } - ] } }, + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] }, + ] } }, }, { - label: 'Nested map with mixed values', + label: 'Nested Array with mixed values', delta_data: { foo: [ { 'bar': { @@ -510,256 +342,16 @@ describe( 'system.V1MessageWriter', () => }, ] }, expected: { 'foo': { 'array': [ - { 'bucket': { 'map': { 'map': { 'bar': - { 'bucket': { 'map': { 'map': { - 'wer': { 'bucket': { 'map': { - 'string': 'qaz' - } } }, - 'qwe': { 'bucket': { 'map': { - 'double': 1572903485000 - } } }, - 'asd': { 'bucket': { 'map': { - 'boolean': true - } } }, - 'zxc': { 'bucket': { 'map': null } } - } } } } - } } } } + { 'map': { + 'bar': { 'map': { + 'wer': { 'string': 'qaz' }, + 'qwe': { 'double': 1572903485000 }, + 'asd': { 'boolean': true }, + 'zxc': null, + } }, + } }, ] } }, }, - { - label: 'Arbitrary array/map depth', - delta_data: { - "a": [ - { - "b": [ - "c", - [ - [ - "d", - { - "e": "f", - "g": "h", - "i": "j", - "k": "l", - "m": "n" - }, - [ - [ - "o", - { - "p": "q", - "r": "s", - "t": "u" - }, - [], - null - ], - [ - "v", - { - "w": "x", - "y": "z" - }, - [ - [ - "aa", - { - "ab": "ac", - "ad": "ae", - "af": "ag", - "ah": "ai" - }, - [], - null - ], - [ - "aj", - { - "ak": "al", - "am": "an", - "ao": "ap" - }, - [], - [ - "q" - ] - ] - ], - "" - ] - ], - "" - ] - ], - null, - false - ], - } ], - }, - expected: { - "a": { "array": [ - { "bucket": { "map": { "map": { - "b": { "bucket": { "map": { "array": [ - { "bucket": { "map": { - "string": "c" - } } }, - { "bucket": { "map": { "array": [ - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "d" - } } }, - { "bucket": { "map": { - "map": { - "e": { "bucket": { "map": { "string": "f" } } }, - "g": { "bucket": { "map": { "string": "h" } } }, - "i": { "bucket": { "map": { "string": "j" } } }, - "k": { "bucket": { "map": { "string": "l" } } }, - "m": { "bucket": { "map": { "string": "n" } } } - } - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "o" - } } }, - { "bucket": { "map": { - "map": { - "p": { "bucket": { "map": { - "string": "q" - } } }, - "r": { "bucket": { "map": { - "string": "s" - } } }, - "t": { "bucket": { "map": { - "string": "u" - } } } - } - } } }, - { "bucket": { "map": { "array": [] } } }, - { "bucket": { "map": null } } - ] - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "v" - } } }, - { "bucket": { "map": { - "map": { - "w": { "bucket": { "map": { "string": "x" } } }, - "y": { "bucket": { "map": { "string": "z" } } } - } - } } }, - { - "bucket": { - "map": { - "array": [ - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "aa" - } } }, - { "bucket": { "map": { - "map": { - "ab": { - "bucket": { - "map": { - "string": "ac" - } - } - }, - "ad": { - "bucket": { - "map": { - "string": "ae" - } - } - }, - "af": { - "bucket": { - "map": { - "string": "ag" - } - } - }, - "ah": { - "bucket": { - "map": { - "string": "ai" - } - } - } - } - } } }, - { "bucket": { "map": { - "array": [] - } } }, - { "bucket": { "map": null } } - ] - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "aj" - } } }, - { "bucket": { "map": { - "map": { - "ak": { - "bucket": { "map": { - "string": "al" - } } - }, - "am": { - "bucket": { "map": { - "string": "an" - } } - }, - "ao": { - "bucket": { "map": { - "string": "ap" - } } - } - } - } } }, - { "bucket": { "map": { - "array": [] - } } }, - { "bucket": { "map": { - "array": [ - { "bucket": { "map": { - "string": "q" - } } } - ] - } } } - ] - } } } - ] - } - } - }, - { "bucket": { "map": { - "string": "" - } } } - ] - } } } - ] - } } }, - { "bucket": { "map": { - "string": "" - } } } - ] - } } } - ] } } }, - { "bucket": { "map": null } }, - { "bucket": { "map": { "boolean": false } } } - ] } } } - } } } } - ] } - }, - }, ].forEach( ( { label, delta_data, expected } ) => { it( label, () => @@ -780,11 +372,13 @@ describe( 'system.V1MessageWriter', () => } ); + it( 'Message is formatted correctly', () => { const bucket = { foo: [ 'bar', 'baz' ] }; const ratedata = {}; const doc_id = 123; + const program = 'mega'; const entity_name = 'Some Agency'; const entity_id = 123; const startDate = 345; @@ -794,6 +388,7 @@ describe( 'system.V1MessageWriter', () => const encoder = createMockEncoderCtor( schema ); const meta = { id: doc_id, + program: program, entity_name: entity_name, entity_id: entity_id, startDate: startDate, @@ -832,8 +427,8 @@ describe( 'system.V1MessageWriter', () => Data: { bucket: { 'foo': { 'array': [ - { 'bucket': { 'map': { 'string': 'bar' } } }, - { 'bucket': { 'map': { 'string': 'baz' } } }, + { 'string': 'bar' }, + { 'string': 'baz' }, ] } }, }, @@ -850,7 +445,7 @@ describe( 'system.V1MessageWriter', () => }, program: { Program: { - id: 'quote_server', + id: program, version: '', }, },