diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts
index 2cbce84..e2597cc 100644
--- a/src/server/db/MongoServerDao.ts
+++ b/src/server/db/MongoServerDao.ts
@@ -81,7 +81,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
*
* @return MongoServerDao self to allow for method chaining
*/
- init( callback: () => {} ): this
+ init( callback: () => void ): this
{
var dao = this;
@@ -109,7 +109,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
*
* @return MongoServerDao self to allow for method chaining
*/
- connect( callback: () => {} ): this
+ connect( callback: () => void ): this
{
var dao = this;
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
new file mode 100644
index 0000000..6103f20
--- /dev/null
+++ b/src/system/DeltaProcessor.ts
@@ -0,0 +1,177 @@
+/**
+ * Delta Processor
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+import { DeltaDao } from "../system/db/DeltaDao";
+import { MongoDeltaType } from "../system/db/MongoDeltaDao";
+import { DeltaResult } from "../bucket/delta";
+import { DocumentId } from "../document/Document";
+
+
+/**
+ * Process deltas for a quote and publish to a queue
+ */
+export class DeltaProcessor
+{
+ /** The ratedata delta type */
+ readonly DELTA_RATEDATA: MongoDeltaType = 'ratedata';
+
+ /** The data delta type */
+ readonly DELTA_DATA: MongoDeltaType = 'data';
+
+ /** A mapping of which delta type translated to which avro event */
+ readonly DELTA_MAP: Record = {
+ DELTA_RATEDATA: 'rate',
+ DELTA_DATA: 'update',
+ };
+
+
+ /**
+ * Initialize processor
+ *
+ * @param _collection Mongo collection
+ */
+ constructor(
+ private readonly _dao: DeltaDao,
+ ) {}
+
+
+ /**
+ * Process unpublished deltas
+ */
+ process(): void
+ {
+ let self = this;
+
+ this._dao.getUnprocessedDocuments( function( docs )
+ {
+ docs.forEach( doc => {
+
+ const deltas = self.getTimestampSortedDeltas( doc );
+
+ deltas.forEach( delta => {
+
+ // TODO: publish delta
+ // publisher.publish( delta, self.DELTA_MAP[ delta.type ] )
+ console.log( delta, self.DELTA_MAP[ delta.type ] );
+
+ });
+
+ const last_updated_ts = doc.lastUpdated;
+ const doc_id: DocumentId = doc.id;
+
+ self._dao.markDocumentAsProcessed(
+ doc_id,
+ last_updated_ts,
+ function( err, markedSuccessfully )
+ {
+ console.log( err, markedSuccessfully );
+ },
+ );
+ });
+ });
+ }
+
+
+ /**
+ * Get sorted list of deltas
+ *
+ * @param doc - the document
+ *
+ * @return a list of deltas sorted by timestamp
+ */
+ getTimestampSortedDeltas(
+ doc: any,
+ ): DeltaResult[]
+ {
+ const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA );
+ const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA );
+ const deltas = data_deltas.concat( ratedata_deltas );
+
+ deltas.sort( this._sortByTimestamp );
+
+ return deltas;
+ }
+
+
+ /**
+ * Get trimmed delta list
+ *
+ * @param doc - the document
+ * @param type - the delta type to get
+ *
+ * @return a trimmed list of deltas
+ */
+ getDeltas(
+ doc: any,
+ type: MongoDeltaType,
+ ): DeltaResult[]
+ {
+ // Get objects so we can get the index by type
+ const deltas_obj = doc.rdelta || {};
+
+ // Get type specific deltas
+ let last_published_index = 0;
+ if ( doc.lastPublishDelta )
+ {
+ const last_published_indexes = doc.lastPublishDelta;
+
+ last_published_index = last_published_indexes[ type ] || 0;
+ }
+
+ const deltas: DeltaResult[] = deltas_obj[ type ] || [];
+
+ // Only return the unprocessed deltas
+ const deltas_trimmed = deltas.slice( last_published_index );
+
+ // Mark each delta with its type
+ deltas_trimmed.forEach( delta => {
+ delta.type = type;
+ });
+
+ return deltas_trimmed;
+ }
+
+
+ /**
+ * Sort an array of deltas by timestamp
+ *
+ * @param a - The first delta to compare
+ * @param a - The second delta to compare
+ *
+ * @return a sort value
+ */
+ private _sortByTimestamp(
+ a: DeltaResult,
+ b: DeltaResult,
+ ): number
+ {
+ if ( a.timestamp < b.timestamp )
+ {
+ return -1;
+ }
+
+ if ( a.timestamp > b.timestamp ) {
+ return 1;
+ }
+
+ return 0;
+ }
+}
\ No newline at end of file
diff --git a/src/server/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
similarity index 97%
rename from src/server/db/DeltaDao.ts
rename to src/system/db/DeltaDao.ts
index 4728d99..53cd8f5 100644
--- a/src/server/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -40,7 +40,7 @@ export interface DeltaDao
* @return documents in need of processing
*/
getUnprocessedDocuments(
- callback: ( data: Record | null ) => void
+ callback: ( data: Record[] ) => void,
): this;
diff --git a/src/server/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
similarity index 97%
rename from src/server/db/MongoDeltaDao.ts
rename to src/system/db/MongoDeltaDao.ts
index 4b98ad6..cebf453 100644
--- a/src/server/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -56,7 +56,7 @@ export class MongoDeltaDao implements DeltaDao
* @return documents in need of processing
*/
getUnprocessedDocuments(
- callback: ( data: Record | null ) => void,
+ callback: ( data: Record[] ) => void,
): this
{
var self = this;
@@ -71,7 +71,7 @@ export class MongoDeltaDao implements DeltaDao
// was the quote found?
if ( data.length == 0 )
{
- callback.call( self, null );
+ callback.call( self, [] );
return;
}
diff --git a/test/server/db/MongoDeltaDaoTest.ts b/test/server/db/MongoDeltaDaoTest.ts
deleted file mode 100644
index c3d0711..0000000
--- a/test/server/db/MongoDeltaDaoTest.ts
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Token state management test
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of the Liza Data Collection Framework.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-import { MongoDeltaDao as Sut } from "../../../src/server/db/MongoDeltaDao";
-import { MongoCollection } from "mongodb";
-import { PositiveInteger } from "../../../src/numeric";
-import { DocumentId } from "../../../src/document/Document";
-
-import { expect, use as chai_use } from 'chai';
-chai_use( require( 'chai-as-promised' ) );
-
-
-describe( 'server.db.MongoDeltaDao', () =>
-{
- describe( '#getUnprocessedDocuments', () =>
- {
- it( 'gets documents', () =>
- {
- let returned_data = { foo: 'bar' };
- let callback_called = false;
-
- const collection = _createMockMongoCollection();
- collection.find = ( _: any, __: any, c: any ) =>
- {
- c( null, {
- toArray: ( c: any ) => c( null, returned_data ),
- } );
- };
-
- const callback = ( data: Record | null ) =>
- {
- expect( returned_data ).to.deep.equal( data );
-
- callback_called = true;
-
- return;
- };
-
- new Sut( collection ).getUnprocessedDocuments( callback );
-
- expect( callback_called ).to.equal( true );
- });
- });
-
- describe( '#advanceDeltaIndexByType', () =>
- {
- it( 'advances specified index', () =>
- {
- const quote_id = 123,
- delta_type = 'ratedata',
- expected_field = 'lastPublishDelta.ratedata',
- index = 1;
-
- let callback_called = false;
-
- const collection = _createMockMongoCollection();
- collection.update = (
- given_quote_id: any,
- given_delta_type: any,
- _given_index: any,
- given_callback: any
- ) =>
- {
- const expected_set: Record = {};
-
- expected_set[ expected_field ] = index
-
- expect( given_quote_id ).to.deep.equal( { id: quote_id } );
- expect( given_delta_type )
- .to.deep.equal( { $set: expected_set } );
-
- given_callback( null );
- };
-
- const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
- {
- callback_called = true;
-
- return;
- };
-
- new Sut( collection ).advanceDeltaIndexByType(
- quote_id,
- delta_type,
- index,
- callback,
- );
-
- expect( callback_called ).to.equal( true );
- });
- });
-
- describe( '#markDocumentAsProcessed', () =>
- {
- it( 'doc marked if provided timestamp <= doc timestamp', () =>
- {
- const quote_id = 123,
- last_update_ts = 1573582767;
-
- let callback_called = false;
-
- const collection = _createMockMongoCollection();
- collection.update = (
- given_filter: any,
- _: any,
- __: any,
- given_callback: any
- ) =>
- {
- const expected_filter: Record = {
- id: quote_id,
- lastUpdate: { $gt: last_update_ts }
- };
-
- expect( given_filter ).to.deep.equal( expected_filter );
-
- given_callback( null );
- };
-
- const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
- {
- callback_called = true;
-
- return;
- };
-
- new Sut( collection ).markDocumentAsProcessed(
- quote_id,
- last_update_ts,
- callback,
- );
-
- expect( callback_called ).to.equal( true );
- });
- });
-
-} );
-
-
-function _createMockMongoCollection(): MongoCollection
-{
- return {
- findOne() {},
- update() {},
- findAndModify() {},
- find() {},
- createIndex() {},
- insert() {},
- };
-}
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
new file mode 100644
index 0000000..26923ae
--- /dev/null
+++ b/test/system/DeltaProcessorTest.ts
@@ -0,0 +1,299 @@
+/**
+ * Delta Processor test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
+import { DeltaDao } from '../../src/system/db/DeltaDao';
+import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao';
+
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+
+describe( 'system.DeltaProcessor', () =>
+{
+ describe( '#getTimestampSortedDeltas', () =>
+ {
+ ( <{ label: string, given: any, expected: any }[]>[
+ {
+ label: 'creates list',
+ given: {
+ rdelta: {
+ data: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ },
+ ],
+ ratedata: [
+ {
+ data: { foo: 'third_bar' },
+ timestamp: 345,
+ },
+ {
+ data: { foo: 'fourth_bar' },
+ timestamp: 456,
+ },
+ ]
+ }
+ },
+ expected: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ type: 'data',
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ type: 'data',
+ },
+ {
+ data: { foo: 'third_bar' },
+ timestamp: 345,
+ type: 'ratedata',
+ },
+ {
+ data: { foo: 'fourth_bar' },
+ timestamp: 456,
+ type: 'ratedata',
+ },
+ ],
+ },
+ {
+ label: 'creates list with no ratedata',
+ given: {
+ rdelta: {
+ data: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ },
+ ],
+ ratedata: []
+ }
+ },
+ expected: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ type: 'data',
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ type: 'data',
+ },
+ ],
+ },
+ {
+ label: 'creates list when rate occurs between two saves',
+ given: {
+ rdelta: {
+ data: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ },
+ {
+ data: { foo: 'fourth_bar' },
+ timestamp: 456,
+ },
+ ],
+ ratedata: [
+ {
+ data: { foo: 'third_bar' },
+ timestamp: 345,
+ },
+ ],
+ },
+ },
+ expected: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ type: 'data',
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ type: 'data',
+ },
+ {
+ data: { foo: 'third_bar' },
+ timestamp: 345,
+ type: 'ratedata',
+ },
+ {
+ data: { foo: 'fourth_bar' },
+ timestamp: 456,
+ type: 'data',
+ },
+ ],
+ },
+ ] ).forEach( ( { given, expected, label } ) => it( label, () =>
+ {
+ const sut = new Sut( createMockDeltaDao() );
+ const actual = sut.getTimestampSortedDeltas( given );
+
+ expect( actual ).to.deep.equal( expected );
+ } ) );
+ } );
+
+
+ describe( '#getDeltas', () =>
+ {
+ ( <{
+ label: string,
+ type: MongoDeltaType,
+ given: any,
+ expected: any
+ }[]>[
+ {
+ label: 'return empty array if no deltas are present',
+ type: 'data',
+ given: {
+ rdelta: {},
+ },
+ expected: [],
+ },
+ {
+ label: 'return full list if no lastPublished index is found',
+ type: 'data',
+ given: {
+ rdelta: {
+ data: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ },
+ ],
+ },
+ },
+ expected: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ type: 'data',
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ type: 'data',
+ },
+ ],
+ },
+ {
+ label: 'marks deltas with their type',
+ type: 'data',
+ given: {
+ rdelta: {
+ data: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ },
+ ],
+ },
+ lastPublishDelta: {
+ data: 0,
+ },
+ },
+ expected: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ type: 'data',
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ type: 'data',
+ },
+ ],
+ },
+ {
+ label: 'trims delta array based on index',
+ type: 'data',
+ given: {
+ rdelta: {
+ data: [
+ {
+ data: { foo: 'first_bar' },
+ timestamp: 123,
+ },
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ },
+ ],
+ },
+ lastPublishDelta: {
+ data: 1,
+ },
+ },
+ expected: [
+ {
+ data: { foo: 'second_bar' },
+ timestamp: 234,
+ type: 'data',
+ },
+ ],
+ },
+ ] ).forEach( ( { type, given, expected, label } ) => it( label, () =>
+ {
+ const sut = new Sut( createMockDeltaDao() );
+ const actual = sut.getDeltas( given, type );
+
+ expect( actual ).to.deep.equal( expected );
+ } ) );
+ } );
+} );
+
+
+function createMockDeltaDao(): DeltaDao
+{
+ return {
+ getUnprocessedDocuments() { return this },
+ advanceDeltaIndexByType() { return this },
+ markDocumentAsProcessed() { return this },
+ };
+}