1
0
Fork 0

[DEV-5312] Add dao for deltas

master
Austin Schaffer 2019-11-12 13:24:41 -05:00 committed by Mike Gerwitz
parent d0b2a4ce73
commit 91a7cf94b2
4 changed files with 422 additions and 1 deletions

View File

@ -0,0 +1,76 @@
/**
* Delta data access
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* These types are used to describe the structure of the token data as it
* is stored in Mongo. It has a number of undesirable properties and
* duplicates data---this was intended to make querying easier and work
* around Mongo limitations.
*
* This structure can be changed in the future, but we'll need to maintain
* compatibility with the existing data.
*/
import { DocumentId } from "../../document/Document";
import { PositiveInteger } from "../../numeric";
/** Manage deltas */
export interface DeltaDao
{
/**
* Get documents in need of processing
*
* @return documents in need of processing
*/
getUnprocessedDocuments(
callback: ( data: Record<string, any> | null ) => void
): this;
/**
* Set the document's processed index
*
* @param doc_id - The document whose index will be set
* @param index - The index to set
*/
advanceDeltaIndexByType(
doc_id: DocumentId,
type: string,
index: PositiveInteger,
callback: ( err: NullableError, indexHasAdvanced: boolean ) => void,
): this;
/**
* Mark a given document as processed. First does a check to make sure that
* the document does not have a newer update timestamp than the provided one
*
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
* @return true if the document was successfully marked as processed
*/
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
callback: ( err: NullableError, markedSuccessfully: boolean ) => void,
): this;
}

View File

@ -0,0 +1,174 @@
/**
* Delta data access
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Get deltas from the mongo document in order to process and publish them
*/
import { DocumentId } from "../../document/Document";
import { PositiveInteger } from "../../numeric";
import { MongoCollection } from "mongodb";
import { DeltaDao } from "./DeltaDao";
export type MongoDeltaType = 'ratedata' | 'data';
/** Manage deltas */
export class MongoDeltaDao implements DeltaDao
{
/** The ratedata delta type */
static readonly DELTA_RATEDATA: string = 'ratedata';
/** The data delta type */
static readonly DELTA_DATA: string = 'data';
/**
* Initialize connection
*
* @param _collection Mongo collection
*/
constructor(
private readonly _collection: MongoCollection,
) {}
/**
* Get documents in need of processing
*
* @return documents in need of processing
*/
getUnprocessedDocuments(
callback: ( data: Record<string, any> | null ) => void,
): this
{
var self = this;
this._collection.find(
{ published: false },
{},
function( _err, cursor )
{
cursor.toArray( function( _err: NullableError, data: any[] )
{
// was the quote found?
if ( data.length == 0 )
{
callback.call( self, null );
return;
}
// return the quote data
callback.call( self, data );
});
}
)
return this;
}
/**
* Set the document's processed index
*
* @param doc_id - Document whose index will be set
* @param type - Delta type
* @param index - Index to set
* @param callback - Callback function
*/
advanceDeltaIndexByType(
doc_id: DocumentId,
type: MongoDeltaType,
index: PositiveInteger,
callback: ( err: NullableError, indexAdvanced: boolean ) => void,
): this
{
var self = this;
const set_data: Record<string, any> = {};
set_data[ 'lastPublishDelta.' + type ] = index;
this._collection.update(
{ id: doc_id },
{ $set: set_data },
{ upsert: true },
function( err )
{
if ( err )
{
callback.call( self, err, false );
return;
}
callback.call( self, null, true );
return;
}
);
return this;
}
/**
* Mark a given document as processed. First does a check to make sure that
* the document does not have a newer update timestamp than the provided one
*
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
* @return true if the document was successfully marked as processed
*/
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
callback: ( err: NullableError, indexAdvanced: boolean ) => void,
): this
{
var self = this;
this._collection.update(
{ id: doc_id, lastUpdate: { $gt: last_update_ts } },
{ $set: { processed: true } },
{ upsert: false },
function( err, result )
{
if ( err )
{
callback.call( self, err, false );
return;
}
console.log( '-------', result );
callback.call( self, null, true );
return;
}
);
return this;
}
}

View File

@ -69,6 +69,9 @@ interface MongoFindOptions
{
/** Limit results returned */
limit?: PositiveInteger,
/** Whether to project only id's */
id?: number,
}
@ -148,7 +151,7 @@ declare interface MongoCollection
/**
* Execute a query and return the first result
* Execute a query and return the results
*
* Unlike `update`, the callback return value is not propagated, and so
* the callback ought not return anything.

View File

@ -0,0 +1,168 @@
/**
* Token state management test
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { MongoDeltaDao as Sut } from "../../../src/server/db/MongoDeltaDao";
import { MongoCollection } from "mongodb";
import { PositiveInteger } from "../../../src/numeric";
import { DocumentId } from "../../../src/document/Document";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
describe( 'server.db.MongoDeltaDao', () =>
{
describe( '#getUnprocessedDocuments', () =>
{
it( 'gets documents', () =>
{
let returned_data = { foo: 'bar' };
let callback_called = false;
const collection = _createMockMongoCollection();
collection.find = ( _: any, __: any, c: any ) =>
{
c( null, {
toArray: ( c: any ) => c( null, returned_data ),
} );
};
const callback = ( data: Record<string, any> | null ) =>
{
expect( returned_data ).to.deep.equal( data );
callback_called = true;
return;
};
new Sut( collection ).getUnprocessedDocuments( callback );
expect( callback_called ).to.equal( true );
});
});
describe( '#advanceDeltaIndexByType', () =>
{
it( 'advances specified index', () =>
{
const quote_id = <DocumentId>123,
delta_type = 'ratedata',
expected_field = 'lastPublishDelta.ratedata',
index = <PositiveInteger>1;
let callback_called = false;
const collection = _createMockMongoCollection();
collection.update = (
given_quote_id: any,
given_delta_type: any,
_given_index: any,
given_callback: any
) =>
{
const expected_set: Record<string, any> = {};
expected_set[ expected_field ] = index
expect( given_quote_id ).to.deep.equal( { id: quote_id } );
expect( given_delta_type )
.to.deep.equal( { $set: expected_set } );
given_callback( null );
};
const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
{
callback_called = true;
return;
};
new Sut( collection ).advanceDeltaIndexByType(
quote_id,
delta_type,
index,
callback,
);
expect( callback_called ).to.equal( true );
});
});
describe( '#markDocumentAsProcessed', () =>
{
it( 'doc marked if provided timestamp <= doc timestamp', () =>
{
const quote_id = <DocumentId>123,
last_update_ts = <UnixTimestamp>1573582767;
let callback_called = false;
const collection = _createMockMongoCollection();
collection.update = (
given_filter: any,
_: any,
__: any,
given_callback: any
) =>
{
const expected_filter: Record<string, any> = {
id: quote_id,
lastUpdate: { $gt: last_update_ts }
};
expect( given_filter ).to.deep.equal( expected_filter );
given_callback( null );
};
const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
{
callback_called = true;
return;
};
new Sut( collection ).markDocumentAsProcessed(
quote_id,
last_update_ts,
callback,
);
expect( callback_called ).to.equal( true );
});
});
} );
function _createMockMongoCollection(): MongoCollection
{
return <MongoCollection> {
findOne() {},
update() {},
findAndModify() {},
find() {},
createIndex() {},
insert() {},
};
}