1
0
Fork 0

[DEV-5312] Add preliminary processor

master
Austin Schaffer 2019-11-12 15:07:37 -05:00 committed by Mike Gerwitz
parent 91a7cf94b2
commit 950ae8818b
6 changed files with 481 additions and 173 deletions

View File

@ -81,7 +81,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
* *
* @return MongoServerDao self to allow for method chaining * @return MongoServerDao self to allow for method chaining
*/ */
init( callback: () => {} ): this init( callback: () => void ): this
{ {
var dao = this; var dao = this;
@ -109,7 +109,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
* *
* @return MongoServerDao self to allow for method chaining * @return MongoServerDao self to allow for method chaining
*/ */
connect( callback: () => {} ): this connect( callback: () => void ): this
{ {
var dao = this; var dao = this;

View File

@ -0,0 +1,177 @@
/**
* Delta Processor
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { DeltaDao } from "../system/db/DeltaDao";
import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document";
/**
* Process deltas for a quote and publish to a queue
*/
export class DeltaProcessor
{
/** The ratedata delta type */
readonly DELTA_RATEDATA: MongoDeltaType = 'ratedata';
/** The data delta type */
readonly DELTA_DATA: MongoDeltaType = 'data';
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
DELTA_RATEDATA: 'rate',
DELTA_DATA: 'update',
};
/**
* Initialize processor
*
* @param _collection Mongo collection
*/
constructor(
private readonly _dao: DeltaDao,
) {}
/**
* Process unpublished deltas
*/
process(): void
{
let self = this;
this._dao.getUnprocessedDocuments( function( docs )
{
docs.forEach( doc => {
const deltas = self.getTimestampSortedDeltas( doc );
deltas.forEach( delta => {
// TODO: publish delta
// publisher.publish( delta, self.DELTA_MAP[ delta.type ] )
console.log( delta, self.DELTA_MAP[ delta.type ] );
});
const last_updated_ts = doc.lastUpdated;
const doc_id: DocumentId = doc.id;
self._dao.markDocumentAsProcessed(
doc_id,
last_updated_ts,
function( err, markedSuccessfully )
{
console.log( err, markedSuccessfully );
},
);
});
});
}
/**
* Get sorted list of deltas
*
* @param doc - the document
*
* @return a list of deltas sorted by timestamp
*/
getTimestampSortedDeltas(
doc: any,
): DeltaResult<any>[]
{
const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA );
const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA );
const deltas = data_deltas.concat( ratedata_deltas );
deltas.sort( this._sortByTimestamp );
return deltas;
}
/**
* Get trimmed delta list
*
* @param doc - the document
* @param type - the delta type to get
*
* @return a trimmed list of deltas
*/
getDeltas(
doc: any,
type: MongoDeltaType,
): DeltaResult<any>[]
{
// Get objects so we can get the index by type
const deltas_obj = doc.rdelta || {};
// Get type specific deltas
let last_published_index = 0;
if ( doc.lastPublishDelta )
{
const last_published_indexes = doc.lastPublishDelta;
last_published_index = last_published_indexes[ type ] || 0;
}
const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
// Only return the unprocessed deltas
const deltas_trimmed = deltas.slice( last_published_index );
// Mark each delta with its type
deltas_trimmed.forEach( delta => {
delta.type = type;
});
return deltas_trimmed;
}
/**
* Sort an array of deltas by timestamp
*
* @param a - The first delta to compare
* @param a - The second delta to compare
*
* @return a sort value
*/
private _sortByTimestamp(
a: DeltaResult<any>,
b: DeltaResult<any>,
): number
{
if ( a.timestamp < b.timestamp )
{
return -1;
}
if ( a.timestamp > b.timestamp ) {
return 1;
}
return 0;
}
}

View File

@ -40,7 +40,7 @@ export interface DeltaDao
* @return documents in need of processing * @return documents in need of processing
*/ */
getUnprocessedDocuments( getUnprocessedDocuments(
callback: ( data: Record<string, any> | null ) => void callback: ( data: Record<string, any>[] ) => void,
): this; ): this;

View File

@ -56,7 +56,7 @@ export class MongoDeltaDao implements DeltaDao
* @return documents in need of processing * @return documents in need of processing
*/ */
getUnprocessedDocuments( getUnprocessedDocuments(
callback: ( data: Record<string, any> | null ) => void, callback: ( data: Record<string, any>[] ) => void,
): this ): this
{ {
var self = this; var self = this;
@ -71,7 +71,7 @@ export class MongoDeltaDao implements DeltaDao
// was the quote found? // was the quote found?
if ( data.length == 0 ) if ( data.length == 0 )
{ {
callback.call( self, null ); callback.call( self, [] );
return; return;
} }

View File

@ -1,168 +0,0 @@
/**
* Token state management test
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { MongoDeltaDao as Sut } from "../../../src/server/db/MongoDeltaDao";
import { MongoCollection } from "mongodb";
import { PositiveInteger } from "../../../src/numeric";
import { DocumentId } from "../../../src/document/Document";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
describe( 'server.db.MongoDeltaDao', () =>
{
describe( '#getUnprocessedDocuments', () =>
{
it( 'gets documents', () =>
{
let returned_data = { foo: 'bar' };
let callback_called = false;
const collection = _createMockMongoCollection();
collection.find = ( _: any, __: any, c: any ) =>
{
c( null, {
toArray: ( c: any ) => c( null, returned_data ),
} );
};
const callback = ( data: Record<string, any> | null ) =>
{
expect( returned_data ).to.deep.equal( data );
callback_called = true;
return;
};
new Sut( collection ).getUnprocessedDocuments( callback );
expect( callback_called ).to.equal( true );
});
});
describe( '#advanceDeltaIndexByType', () =>
{
it( 'advances specified index', () =>
{
const quote_id = <DocumentId>123,
delta_type = 'ratedata',
expected_field = 'lastPublishDelta.ratedata',
index = <PositiveInteger>1;
let callback_called = false;
const collection = _createMockMongoCollection();
collection.update = (
given_quote_id: any,
given_delta_type: any,
_given_index: any,
given_callback: any
) =>
{
const expected_set: Record<string, any> = {};
expected_set[ expected_field ] = index
expect( given_quote_id ).to.deep.equal( { id: quote_id } );
expect( given_delta_type )
.to.deep.equal( { $set: expected_set } );
given_callback( null );
};
const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
{
callback_called = true;
return;
};
new Sut( collection ).advanceDeltaIndexByType(
quote_id,
delta_type,
index,
callback,
);
expect( callback_called ).to.equal( true );
});
});
describe( '#markDocumentAsProcessed', () =>
{
it( 'doc marked if provided timestamp <= doc timestamp', () =>
{
const quote_id = <DocumentId>123,
last_update_ts = <UnixTimestamp>1573582767;
let callback_called = false;
const collection = _createMockMongoCollection();
collection.update = (
given_filter: any,
_: any,
__: any,
given_callback: any
) =>
{
const expected_filter: Record<string, any> = {
id: quote_id,
lastUpdate: { $gt: last_update_ts }
};
expect( given_filter ).to.deep.equal( expected_filter );
given_callback( null );
};
const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
{
callback_called = true;
return;
};
new Sut( collection ).markDocumentAsProcessed(
quote_id,
last_update_ts,
callback,
);
expect( callback_called ).to.equal( true );
});
});
} );
function _createMockMongoCollection(): MongoCollection
{
return <MongoCollection> {
findOne() {},
update() {},
findAndModify() {},
find() {},
createIndex() {},
insert() {},
};
}

View File

@ -0,0 +1,299 @@
/**
* Delta Processor test
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { DeltaDao } from '../../src/system/db/DeltaDao';
import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao';
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
describe( 'system.DeltaProcessor', () =>
{
describe( '#getTimestampSortedDeltas', () =>
{
( <{ label: string, given: any, expected: any }[]>[
{
label: 'creates list',
given: {
rdelta: {
data: [
{
data: { foo: 'first_bar' },
timestamp: 123,
},
{
data: { foo: 'second_bar' },
timestamp: 234,
},
],
ratedata: [
{
data: { foo: 'third_bar' },
timestamp: 345,
},
{
data: { foo: 'fourth_bar' },
timestamp: 456,
},
]
}
},
expected: [
{
data: { foo: 'first_bar' },
timestamp: 123,
type: 'data',
},
{
data: { foo: 'second_bar' },
timestamp: 234,
type: 'data',
},
{
data: { foo: 'third_bar' },
timestamp: 345,
type: 'ratedata',
},
{
data: { foo: 'fourth_bar' },
timestamp: 456,
type: 'ratedata',
},
],
},
{
label: 'creates list with no ratedata',
given: {
rdelta: {
data: [
{
data: { foo: 'first_bar' },
timestamp: 123,
},
{
data: { foo: 'second_bar' },
timestamp: 234,
},
],
ratedata: []
}
},
expected: [
{
data: { foo: 'first_bar' },
timestamp: 123,
type: 'data',
},
{
data: { foo: 'second_bar' },
timestamp: 234,
type: 'data',
},
],
},
{
label: 'creates list when rate occurs between two saves',
given: {
rdelta: {
data: [
{
data: { foo: 'first_bar' },
timestamp: 123,
},
{
data: { foo: 'second_bar' },
timestamp: 234,
},
{
data: { foo: 'fourth_bar' },
timestamp: 456,
},
],
ratedata: [
{
data: { foo: 'third_bar' },
timestamp: 345,
},
],
},
},
expected: [
{
data: { foo: 'first_bar' },
timestamp: 123,
type: 'data',
},
{
data: { foo: 'second_bar' },
timestamp: 234,
type: 'data',
},
{
data: { foo: 'third_bar' },
timestamp: 345,
type: 'ratedata',
},
{
data: { foo: 'fourth_bar' },
timestamp: 456,
type: 'data',
},
],
},
] ).forEach( ( { given, expected, label } ) => it( label, () =>
{
const sut = new Sut( createMockDeltaDao() );
const actual = sut.getTimestampSortedDeltas( given );
expect( actual ).to.deep.equal( expected );
} ) );
} );
describe( '#getDeltas', () =>
{
( <{
label: string,
type: MongoDeltaType,
given: any,
expected: any
}[]>[
{
label: 'return empty array if no deltas are present',
type: 'data',
given: {
rdelta: {},
},
expected: [],
},
{
label: 'return full list if no lastPublished index is found',
type: 'data',
given: {
rdelta: {
data: [
{
data: { foo: 'first_bar' },
timestamp: 123,
},
{
data: { foo: 'second_bar' },
timestamp: 234,
},
],
},
},
expected: [
{
data: { foo: 'first_bar' },
timestamp: 123,
type: 'data',
},
{
data: { foo: 'second_bar' },
timestamp: 234,
type: 'data',
},
],
},
{
label: 'marks deltas with their type',
type: 'data',
given: {
rdelta: {
data: [
{
data: { foo: 'first_bar' },
timestamp: 123,
},
{
data: { foo: 'second_bar' },
timestamp: 234,
},
],
},
lastPublishDelta: {
data: 0,
},
},
expected: [
{
data: { foo: 'first_bar' },
timestamp: 123,
type: 'data',
},
{
data: { foo: 'second_bar' },
timestamp: 234,
type: 'data',
},
],
},
{
label: 'trims delta array based on index',
type: 'data',
given: {
rdelta: {
data: [
{
data: { foo: 'first_bar' },
timestamp: 123,
},
{
data: { foo: 'second_bar' },
timestamp: 234,
},
],
},
lastPublishDelta: {
data: 1,
},
},
expected: [
{
data: { foo: 'second_bar' },
timestamp: 234,
type: 'data',
},
],
},
] ).forEach( ( { type, given, expected, label } ) => it( label, () =>
{
const sut = new Sut( createMockDeltaDao() );
const actual = sut.getDeltas( given, type );
expect( actual ).to.deep.equal( expected );
} ) );
} );
} );
function createMockDeltaDao(): DeltaDao
{
return <DeltaDao>{
getUnprocessedDocuments() { return this },
advanceDeltaIndexByType() { return this },
markDocumentAsProcessed() { return this },
};
}