From b6f72b4b09ee298445903bfa6b5cf564cbbecb06 Mon Sep 17 00:00:00 2001 From: Austin Schaffer Date: Fri, 13 Dec 2019 17:39:52 -0500 Subject: [PATCH] [DEV-5312] Create additional indexes and process less frequently --- .env | 8 ++++- bin/delta-processor.ts | 2 +- src/system/PrometheusFactory.ts | 2 +- src/system/avro/V1MessageWriter.ts | 6 ++-- src/system/db/MongoFactory.ts | 53 +++++++++++++++++++----------- test/system/V1MessageWriterTest.ts | 8 +++-- 6 files changed, 52 insertions(+), 27 deletions(-) diff --git a/.env b/.env index 21f018c..b1bed50 100644 --- a/.env +++ b/.env @@ -14,4 +14,10 @@ PROM_PUSH_INTERVAL_MS=5000 PROM_BUCKETS_START=0 PROM_BUCKETS_WIDTH=10 PROM_BUCKETS_COUNT=10 -PROCESS_INTERVAL_MS=2000 +LIZA_MONGODB_HA= +LIZA_MONGODB_REPLSET= +LIZA_MONGODB_HOST_A= +LIZA_MONGODB_PORT_A= +LIZA_MONGODB_HOST_B= +LIZA_MONGODB_PORT_B= +PROCESS_INTERVAL_MS=2000 \ No newline at end of file diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 83e42e3..3fd6cb7 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -48,7 +48,7 @@ const amqp_conf = createAmqpConfig( process.env ); const prom_conf = createPrometheusConfig( process.env ); const db_conf = createMongoConfig( process.env ); const db = createMongoDB( db_conf ); -const process_interval_ms = +( process.env.process_interval_ms || 2000 ); +const process_interval_ms = +( process.env.process_interval_ms || 10000 ); const env = process.env.NODE_ENV || 'Unknown Environment'; const emitter = new EventEmitter(); const log = new StandardLogger( console, ts_ctr, env ); diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts index 7330ea9..3f5aad5 100644 --- a/src/system/PrometheusFactory.ts +++ b/src/system/PrometheusFactory.ts @@ -62,7 +62,7 @@ export function createPrometheusConfig( hostname: env.PROM_HOST, port: +( env.PROM_PORT || 0 ), env: process.env.NODE_ENV, - push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 5000 ), + push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 10000 ), buckets_start: +( process.env.PROM_BUCKETS_START || 0 ), buckets_width: +( process.env.PROM_BUCKETS_WIDTH || 10 ), buckets_count: +( process.env.PROM_BUCKETS_COUNT || 10 ), diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts index 09ee64a..6ff0f2a 100644 --- a/src/system/avro/V1MessageWriter.ts +++ b/src/system/avro/V1MessageWriter.ts @@ -101,6 +101,8 @@ export class V1MessageWriter implements MessageWriter const bucket_formatted = this.setDataTypes( bucket ); const ratedata_formatted = this.setDataTypes( ratedata ); const event_id = this.DELTA_MAP[ delta.type ]; + const start_date_ms = { "long": meta.startDate * 1000 }; + const last_update_ms = { "long": meta.lastUpdate * 1000 }; return { event: { @@ -111,8 +113,8 @@ export class V1MessageWriter implements MessageWriter }, document: { id: meta.id, - created: meta.startDate, - modified: meta.lastUpdate, + created: start_date_ms, + modified: last_update_ms, }, session: { Session: { diff --git a/src/system/db/MongoFactory.ts b/src/system/db/MongoFactory.ts index 5a3b03e..f0ee431 100644 --- a/src/system/db/MongoFactory.ts +++ b/src/system/db/MongoFactory.ts @@ -29,9 +29,9 @@ import { DaoError } from '../../error/DaoError'; const { - Db: MongoDb, - Server: MongoServer, - ReplServers: ReplSetServers, + Db: MongoDb, + Server: MongoServer, + ReplSetServers: ReplSetServers, } = require( 'mongodb' ); @@ -148,29 +148,42 @@ export function getMongoCollection( return; } - // initialize indexes - collection.createIndex( - [ - ['published', 1], - ['deltaError', 1], - ], - true, - ( e: any, _index: { [P: string]: any } ) => + let createdCount = 0 + const checkAllCreated = (): void => + { + if( createdCount >= 3 ) { - if ( e ) - { - reject( new DaoError( - 'Error creating index: ' + e - ) ); - return; - } - resolve( collection ); + } + }; + + const cb = ( e: any, _index: { [P: string]: any } ): void => + { + if ( e ) + { + reject( new DaoError( + 'Error creating index: ' + e + ) ); return; } + + createdCount++; + checkAllCreated(); + }; + + // initialize indexes + collection.createIndex( [ ['published', 1] ], false, cb ); + collection.createIndex( [ ['deltaError', 1] ], false, cb); + collection.createIndex( + [ + [ 'published', 1 ], + [ 'deltaError', 1 ], + ], + false, + cb ); } ); } ); } ); -} \ No newline at end of file +} diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts index 271d735..55c64ab 100644 --- a/test/system/V1MessageWriterTest.ts +++ b/test/system/V1MessageWriterTest.ts @@ -371,8 +371,12 @@ describe( 'system.V1MessageWriter', () => }, document: { id: doc_id, - created: startDate, - modified: lastUpdate, + created: { + 'long': startDate * 1000 + }, + modified: { + 'long': lastUpdate * 1000 + }, }, session: { Session: {