diff --git a/src/server/db/DeltaDao.ts b/src/server/db/DeltaDao.ts
new file mode 100644
index 0000000..4728d99
--- /dev/null
+++ b/src/server/db/DeltaDao.ts
@@ -0,0 +1,76 @@
+/**
+ * Delta data access
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ * These types are used to describe the structure of the token data as it
+ * is stored in Mongo. It has a number of undesirable properties and
+ * duplicates data---this was intended to make querying easier and work
+ * around Mongo limitations.
+ *
+ * This structure can be changed in the future, but we'll need to maintain
+ * compatibility with the existing data.
+ */
+
+import { DocumentId } from "../../document/Document";
+import { PositiveInteger } from "../../numeric";
+
+
+/** Manage deltas */
+export interface DeltaDao
+{
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(
+ callback: ( data: Record | null ) => void
+ ): this;
+
+
+ /**
+ * Set the document's processed index
+ *
+ * @param doc_id - The document whose index will be set
+ * @param index - The index to set
+ */
+ advanceDeltaIndexByType(
+ doc_id: DocumentId,
+ type: string,
+ index: PositiveInteger,
+ callback: ( err: NullableError, indexHasAdvanced: boolean ) => void,
+ ): this;
+
+
+ /**
+ * Mark a given document as processed. First does a check to make sure that
+ * the document does not have a newer update timestamp than the provided one
+ *
+ * @param doc_id - The document to mark
+ * @param last_update_ts - The last time this document was updated
+ *
+ * @return true if the document was successfully marked as processed
+ */
+ markDocumentAsProcessed(
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ callback: ( err: NullableError, markedSuccessfully: boolean ) => void,
+ ): this;
+}
+
diff --git a/src/server/db/MongoDeltaDao.ts b/src/server/db/MongoDeltaDao.ts
new file mode 100644
index 0000000..4b98ad6
--- /dev/null
+++ b/src/server/db/MongoDeltaDao.ts
@@ -0,0 +1,174 @@
+/**
+ * Delta data access
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ * Get deltas from the mongo document in order to process and publish them
+ */
+
+import { DocumentId } from "../../document/Document";
+import { PositiveInteger } from "../../numeric";
+import { MongoCollection } from "mongodb";
+import { DeltaDao } from "./DeltaDao";
+
+
+export type MongoDeltaType = 'ratedata' | 'data';
+
+
+/** Manage deltas */
+export class MongoDeltaDao implements DeltaDao
+{
+ /** The ratedata delta type */
+ static readonly DELTA_RATEDATA: string = 'ratedata';
+
+ /** The data delta type */
+ static readonly DELTA_DATA: string = 'data';
+
+
+ /**
+ * Initialize connection
+ *
+ * @param _collection Mongo collection
+ */
+ constructor(
+ private readonly _collection: MongoCollection,
+ ) {}
+
+
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(
+ callback: ( data: Record | null ) => void,
+ ): this
+ {
+ var self = this;
+
+ this._collection.find(
+ { published: false },
+ {},
+ function( _err, cursor )
+ {
+ cursor.toArray( function( _err: NullableError, data: any[] )
+ {
+ // was the quote found?
+ if ( data.length == 0 )
+ {
+ callback.call( self, null );
+
+ return;
+ }
+
+ // return the quote data
+ callback.call( self, data );
+ });
+ }
+ )
+
+ return this;
+ }
+
+
+ /**
+ * Set the document's processed index
+ *
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ * @param index - Index to set
+ * @param callback - Callback function
+ */
+ advanceDeltaIndexByType(
+ doc_id: DocumentId,
+ type: MongoDeltaType,
+ index: PositiveInteger,
+ callback: ( err: NullableError, indexAdvanced: boolean ) => void,
+ ): this
+ {
+ var self = this;
+
+ const set_data: Record = {};
+
+ set_data[ 'lastPublishDelta.' + type ] = index;
+
+ this._collection.update(
+ { id: doc_id },
+ { $set: set_data },
+ { upsert: true },
+ function( err )
+ {
+ if ( err )
+ {
+ callback.call( self, err, false );
+
+ return;
+ }
+
+ callback.call( self, null, true );
+
+ return;
+ }
+ );
+
+ return this;
+ }
+
+
+ /**
+ * Mark a given document as processed. First does a check to make sure that
+ * the document does not have a newer update timestamp than the provided one
+ *
+ * @param doc_id - The document to mark
+ * @param last_update_ts - The last time this document was updated
+ *
+ * @return true if the document was successfully marked as processed
+ */
+ markDocumentAsProcessed(
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ callback: ( err: NullableError, indexAdvanced: boolean ) => void,
+ ): this
+ {
+ var self = this;
+
+ this._collection.update(
+ { id: doc_id, lastUpdate: { $gt: last_update_ts } },
+ { $set: { processed: true } },
+ { upsert: false },
+ function( err, result )
+ {
+ if ( err )
+ {
+ callback.call( self, err, false );
+
+ return;
+ }
+
+ console.log( '-------', result );
+
+ callback.call( self, null, true );
+
+ return;
+ }
+ );
+
+ return this;
+ }
+}
+
diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts
index 4b0e21f..6cc221f 100644
--- a/src/types/mongodb.d.ts
+++ b/src/types/mongodb.d.ts
@@ -69,6 +69,9 @@ interface MongoFindOptions
{
/** Limit results returned */
limit?: PositiveInteger,
+
+ /** Whether to project only id's */
+ id?: number,
}
@@ -148,7 +151,7 @@ declare interface MongoCollection
/**
- * Execute a query and return the first result
+ * Execute a query and return the results
*
* Unlike `update`, the callback return value is not propagated, and so
* the callback ought not return anything.
diff --git a/test/server/db/MongoDeltaDaoTest.ts b/test/server/db/MongoDeltaDaoTest.ts
new file mode 100644
index 0000000..c3d0711
--- /dev/null
+++ b/test/server/db/MongoDeltaDaoTest.ts
@@ -0,0 +1,168 @@
+/**
+ * Token state management test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+import { MongoDeltaDao as Sut } from "../../../src/server/db/MongoDeltaDao";
+import { MongoCollection } from "mongodb";
+import { PositiveInteger } from "../../../src/numeric";
+import { DocumentId } from "../../../src/document/Document";
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+
+describe( 'server.db.MongoDeltaDao', () =>
+{
+ describe( '#getUnprocessedDocuments', () =>
+ {
+ it( 'gets documents', () =>
+ {
+ let returned_data = { foo: 'bar' };
+ let callback_called = false;
+
+ const collection = _createMockMongoCollection();
+ collection.find = ( _: any, __: any, c: any ) =>
+ {
+ c( null, {
+ toArray: ( c: any ) => c( null, returned_data ),
+ } );
+ };
+
+ const callback = ( data: Record | null ) =>
+ {
+ expect( returned_data ).to.deep.equal( data );
+
+ callback_called = true;
+
+ return;
+ };
+
+ new Sut( collection ).getUnprocessedDocuments( callback );
+
+ expect( callback_called ).to.equal( true );
+ });
+ });
+
+ describe( '#advanceDeltaIndexByType', () =>
+ {
+ it( 'advances specified index', () =>
+ {
+ const quote_id = 123,
+ delta_type = 'ratedata',
+ expected_field = 'lastPublishDelta.ratedata',
+ index = 1;
+
+ let callback_called = false;
+
+ const collection = _createMockMongoCollection();
+ collection.update = (
+ given_quote_id: any,
+ given_delta_type: any,
+ _given_index: any,
+ given_callback: any
+ ) =>
+ {
+ const expected_set: Record = {};
+
+ expected_set[ expected_field ] = index
+
+ expect( given_quote_id ).to.deep.equal( { id: quote_id } );
+ expect( given_delta_type )
+ .to.deep.equal( { $set: expected_set } );
+
+ given_callback( null );
+ };
+
+ const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
+ {
+ callback_called = true;
+
+ return;
+ };
+
+ new Sut( collection ).advanceDeltaIndexByType(
+ quote_id,
+ delta_type,
+ index,
+ callback,
+ );
+
+ expect( callback_called ).to.equal( true );
+ });
+ });
+
+ describe( '#markDocumentAsProcessed', () =>
+ {
+ it( 'doc marked if provided timestamp <= doc timestamp', () =>
+ {
+ const quote_id = 123,
+ last_update_ts = 1573582767;
+
+ let callback_called = false;
+
+ const collection = _createMockMongoCollection();
+ collection.update = (
+ given_filter: any,
+ _: any,
+ __: any,
+ given_callback: any
+ ) =>
+ {
+ const expected_filter: Record = {
+ id: quote_id,
+ lastUpdate: { $gt: last_update_ts }
+ };
+
+ expect( given_filter ).to.deep.equal( expected_filter );
+
+ given_callback( null );
+ };
+
+ const callback = ( _err: NullableError, _indexAdvanced: boolean ) =>
+ {
+ callback_called = true;
+
+ return;
+ };
+
+ new Sut( collection ).markDocumentAsProcessed(
+ quote_id,
+ last_update_ts,
+ callback,
+ );
+
+ expect( callback_called ).to.equal( true );
+ });
+ });
+
+} );
+
+
+function _createMockMongoCollection(): MongoCollection
+{
+ return {
+ findOne() {},
+ update() {},
+ findAndModify() {},
+ find() {},
+ createIndex() {},
+ insert() {},
+ };
+}