From c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead Mon Sep 17 00:00:00 2001 From: Austin Schaffer Date: Tue, 12 Nov 2019 16:41:31 -0500 Subject: [PATCH] [DEV-5312] Add interface for amqp publisher and implement a delta publisher --- package.json.in | 3 +- src/server/db/MongoServerDao.js | 482 ------------------------------ src/system/AmqpPublisher.ts | 42 +++ src/system/DeltaProcessor.ts | 14 +- src/system/DeltaPublisher.ts | 133 +++++++++ test/system/DeltaProcessorTest.ts | 21 +- test/system/DeltaPublisherTest.ts | 49 +++ 7 files changed, 249 insertions(+), 495 deletions(-) delete mode 100644 src/server/db/MongoServerDao.js create mode 100644 src/system/AmqpPublisher.ts create mode 100644 src/system/DeltaPublisher.ts create mode 100644 test/system/DeltaPublisherTest.ts diff --git a/package.json.in b/package.json.in index c0db61e..d81861d 100644 --- a/package.json.in +++ b/package.json.in @@ -38,7 +38,8 @@ "mocha": "5.2.0", "@types/mocha": "5.2.0", "sinon": ">=1.17.4", - "es6-promise": "~3" + "es6-promise": "~3", + "@types/amqplib": "0.5.13" }, "licenses": [ diff --git a/src/server/db/MongoServerDao.js b/src/server/db/MongoServerDao.js deleted file mode 100644 index 34878c4..0000000 --- a/src/server/db/MongoServerDao.js +++ /dev/null @@ -1,482 +0,0 @@ -"use strict"; -/** - * Mongo DB DAO for program server - * - * Copyright (C) 2010-2019 R-T Specialty, LLC. - * - * This file is part of the Liza Data Collection Framework. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -var __extends = (this && this.__extends) || (function () { - var extendStatics = function (d, b) { - extendStatics = Object.setPrototypeOf || - ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || - function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; }; - return extendStatics(d, b); - }; - return function (d, b) { - extendStatics(d, b); - function __() { this.constructor = d; } - d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); - }; -})(); -Object.defineProperty(exports, "__esModule", { value: true }); -var EventEmitter = require('events').EventEmitter; -/** - * Uses MongoDB as a data store - */ -var MongoServerDao = /** @class */ (function (_super) { - __extends(MongoServerDao, _super); - /** - * Initializes DAO - * - * @param {Mongo.Db} db mongo database connection - */ - function MongoServerDao(_db) { - var _this = _super.call(this) || this; - _this._db = _db; - /** Collection used to store quotes */ - _this.COLLECTION = 'quotes'; - /** Sequence (auto-increment) collection */ - _this.COLLECTION_SEQ = 'seq'; - /** Sequence key for quote ids */ - _this.SEQ_QUOTE_ID = 'quoteId'; - /** Sequence quoteId default */ - _this.SEQ_QUOTE_ID_DEFAULT = 200000; - /** Whether the DAO is initialized and ready to be used */ - _this._ready = false; - return _this; - } - /** - * Initializes error events and attempts to connect to the database - * - * connectError event will be emitted on failure. - * - * @param Function callback function to call when connection is complete - * (will not be called if connection fails) - * - * @return MongoServerDao self to allow for method chaining - */ - MongoServerDao.prototype.init = function (callback) { - var dao = this; - // map db error event (on connection error) to our connectError event - this._db.on('error', function (err) { - dao._ready = false; - dao._collection = null; - dao.emit('connectError', err); - }); - this.connect(callback); - return this; - }; - /** - * Attempts to connect to the database - * - * connectError event will be emitted on failure. - * - * @param Function callback function to call when connection is complete - * (will not be called if connection fails) - * - * @return MongoServerDao self to allow for method chaining - */ - MongoServerDao.prototype.connect = function (callback) { - var dao = this; - // attempt to connect to the database - this._db.open(function (err, db) { - // if there was an error, don't bother with anything else - if (err) { - // in some circumstances, it may just be telling us that we're - // already connected (even though the connection may have been - // broken) - if (err.errno !== undefined) { - dao.emit('connectError', err); - return; - } - } - var ready_count = 0; - var check_ready = function () { - if (++ready_count < 2) { - return; - } - // we're ready to roll! - dao._ready = true; - dao.emit('ready'); - // connection was successful; call the callback - if (callback instanceof Function) { - callback.call(dao); - } - }; - // quotes collection - db.collection(dao.COLLECTION, function (_err, collection) { - // for some reason this gets called more than once - if (collection == null) { - return; - } - // initialize indexes - collection.createIndex([['id', 1]], true, function (_err, _index) { - // mark the DAO as ready to be used - dao._collection = collection; - check_ready(); - }); - }); - // seq collection - db.collection(dao.COLLECTION_SEQ, function (err, collection) { - if (err) { - dao.emit('seqError', err); - return; - } - if (collection == null) { - return; - } - dao._seqCollection = collection; - // has the sequence we'll be referencing been initialized? - collection.find({ _id: dao.SEQ_QUOTE_ID }, { limit: 1 }, function (err, cursor) { - if (err) { - dao._initQuoteIdSeq(check_ready); - return; - } - cursor.toArray(function (_err, data) { - if (data.length == 0) { - dao._initQuoteIdSeq(check_ready); - return; - } - check_ready(); - }); - }); - }); - }); - return this; - }; - MongoServerDao.prototype._initQuoteIdSeq = function (callback) { - var dao = this; - this._seqCollection.insert({ - _id: this.SEQ_QUOTE_ID, - val: this.SEQ_QUOTE_ID_DEFAULT, - }, function (err, _docs) { - if (err) { - dao.emit('seqError', err); - return; - } - dao.emit('seqInit', dao.SEQ_QUOTE_ID); - callback.call(dao); - }); - }; - /** - * Saves a quote to the database - * - * A full save will include all metadata. This should not cause any - * problems with race conditions for pending Data API calls on meta - * fields because those results write to individual indexes and do not - * rely on existing data. - * - * @param Quote quote the quote to save - * @param Function success_callback function to call on success - * @param Function failure_callback function to call if save fails - * @param Object save_data quote data to save (optional) - * @param Object push_data quote data to push (optional) - */ - MongoServerDao.prototype.saveQuote = function (quote, success_callback, failure_callback, save_data, push_data) { - var dao = this; - var meta = {}; - // if we're not ready, then we can't save the quote! - if (this._ready === false) { - this.emit('saveQuoteError', { message: 'Database server not ready' }, Error('Database not ready'), quote); - failure_callback.call(this, quote); - return dao; - } - if (save_data === undefined) { - save_data = { - data: quote.getBucket().getData(), - }; - // full save will include all metadata - meta = quote.getMetabucket().getData(); - } - var id = quote.getId(); - // some data should always be saved because the quote will be created if - // it does not yet exist - save_data.id = id; - save_data.pver = quote.getProgramVersion(); - save_data.importDirty = 1; - save_data.lastPremDate = quote.getLastPremiumDate(); - save_data.initialRatedDate = quote.getRatedDate(); - save_data.explicitLock = quote.getExplicitLockReason(); - save_data.explicitLockStepId = quote.getExplicitLockStep(); - save_data.importedInd = +quote.isImported(); - save_data.boundInd = +quote.isBound(); - save_data.lastUpdate = Math.round((new Date()).getTime() / 1000); - // meta will eventually take over for much of the above data - meta.liza_timestamp_initial_rated = [quote.getRatedDate()]; - // save the stack so we can track this call via the oplog - save_data._stack = (new Error()).stack; - // avoid wiping out other metadata (since this may not be a full set) - Object.keys(meta).forEach(function (key) { return save_data['meta.' + key] = meta[key]; }); - // do not push empty objects - var document = (!push_data || !Object.keys(push_data).length) - ? { '$set': save_data } - : { '$set': save_data, '$push': push_data }; - // update the quote data if it already exists (same id), otherwise - // insert it - this._collection.update({ id: id }, document, - // create record if it does not yet exist - { upsert: true }, - // on complete - function (err, _docs) { - // if an error occurred, then we cannot continue - if (err) { - dao.emit('saveQuoteError', err, quote); - // let the caller handle the error - if (failure_callback instanceof Function) { - failure_callback.call(dao, quote); - } - return; - } - // successful - if (success_callback instanceof Function) { - success_callback.call(dao, quote); - } - }); - return this; - }; - /** - * Merges quote data with the existing (rather than overwriting) - * - * @param {Quote} quote quote to save - * @param {Object} data quote data - * @param {Function} scallback successful callback - * @param {Function} fcallback failure callback - */ - MongoServerDao.prototype.mergeData = function (quote, data, scallback, fcallback) { - // we do not want to alter the original data; use it as a prototype - var update = data; - // save the stack so we can track this call via the oplog - var _self = this; - this._collection.update({ id: quote.getId() }, { '$set': update }, {}, function (err, _docs) { - if (err) { - _self.emit('saveQuoteError', err, quote); - if (typeof fcallback === 'function') { - fcallback(quote); - } - return; - } - if (typeof scallback === 'function') { - scallback(quote); - } - }); - return this; - }; - /** - * Merges bucket data with the existing bucket (rather than overwriting the - * entire bucket) - * - * @param {Quote} quote quote to save - * @param {Object} data bucket data - * @param {Function} scallback successful callback - * @param {Function} fcallback failure callback - * - * @return {MongoServerDao} self - */ - MongoServerDao.prototype.mergeBucket = function (quote, data, success, failure) { - var update = {}; - for (var field in data) { - if (!field) { - continue; - } - update['data.' + field] = data[field]; - } - return this.mergeData(quote, update, success, failure); - }; - /** - * Saves the quote state to the database - * - * The quote state includes the current step, the top visited step and the - * explicit lock message. - * - * @param Quote quote the quote to save - * @param Function success_callback function to call on success - * @param Function failure_callback function to call if save fails - * - * @return MongoServerDao self - */ - MongoServerDao.prototype.saveQuoteState = function (quote, success_callback, failure_callback) { - var update = { - currentStepId: quote.getCurrentStepId(), - topVisitedStepId: quote.getTopVisitedStepId(), - topSavedStepId: quote.getTopSavedStepId(), - }; - return this.mergeData(quote, update, success_callback, failure_callback); - }; - MongoServerDao.prototype.saveQuoteClasses = function (quote, classes, success, failure) { - return this.mergeData(quote, { classData: classes }, success, failure); - }; - /** - * Save document metadata (meta field on document) - * - * Only the provided indexes will be modified (that is---data will be - * merged with what is already in the database). - * - * @param {Quote} quote destination quote - * @param {Object} new_meta bucket-formatted data to write - * @param {Function} success callback on success - * @param {Function} failure callback on error - * - * @return {undefined} - */ - MongoServerDao.prototype.saveQuoteMeta = function (quote, new_meta, success, failure) { - var update = {}; - for (var key in new_meta) { - var meta = new_meta[key]; - for (var i in meta) { - update['meta.' + key + '.' + i] = new_meta[key][i]; - } - } - this.mergeData(quote, update, success, failure); - }; - /** - * Saves the quote lock state to the database - * - * @param Quote quote the quote to save - * @param Function success_callback function to call on success - * @param Function failure_callback function to call if save fails - * - * @return MongoServerDao self - */ - MongoServerDao.prototype.saveQuoteLockState = function (quote, success_callback, failure_callback) { - // lock state is saved by default - return this.saveQuote(quote, success_callback, failure_callback, {}); - }; - /** - * Pulls quote data from the database - * - * @param Integer quote_id id of quote - * @param Function( data ) callback function to call when data is available - * - * @return MongoServerDao self to allow for method chaining - */ - MongoServerDao.prototype.pullQuote = function (quote_id, callback) { - var dao = this; - // XXX: TODO: Do not read whole of record into memory; filter out - // revisions! - this._collection.find({ id: quote_id }, { limit: 1 }, function (_err, cursor) { - cursor.toArray(function (_err, data) { - // was the quote found? - if (data.length == 0) { - callback.call(dao, null); - return; - } - // return the quote data - callback.call(dao, data[0]); - }); - }); - return this; - }; - MongoServerDao.prototype.getMinQuoteId = function (callback) { - // just in case it's asynchronous later on - callback.call(this, this.SEQ_QUOTE_ID_DEFAULT); - return this; - }; - MongoServerDao.prototype.getMaxQuoteId = function (callback) { - var dao = this; - this._seqCollection.find({ _id: this.SEQ_QUOTE_ID }, { limit: 1 }, function (_err, cursor) { - cursor.toArray(function (_err, data) { - if (data.length == 0) { - callback.call(dao, 0); - return; - } - // return the max quote id - callback.call(dao, data[0].val); - }); - }); - }; - MongoServerDao.prototype.getNextQuoteId = function (callback) { - var dao = this; - this._seqCollection.findAndModify({ _id: this.SEQ_QUOTE_ID }, [['val', 'descending']], { $inc: { val: 1 } }, { 'new': true }, function (err, doc) { - if (err) { - dao.emit('seqError', err); - callback.call(dao, 0); - return; - } - // return the new id - callback.call(dao, doc.val); - }); - return this; - }; - /** - * Create a new revision with the provided quote data - * - * The revision will contain the whole the quote. If space is a concern, we - * can (in the future) calculate a delta instead (Mike recommends the Git - * model of storing the deltas in previous revisions and the whole of the - * bucket in the most recently created revision). - */ - MongoServerDao.prototype.createRevision = function (quote, callback) { - var _self = this, qid = quote.getId(), data = quote.getBucket().getData(); - this._collection.update({ id: qid }, { '$push': { revisions: { data: data } } }, - // create record if it does not yet exist - { upsert: true }, - // on complete - function (err) { - if (err) { - _self.emit('mkrevError', err); - } - callback(err); - return; - }); - }; - MongoServerDao.prototype.getRevision = function (quote, revid, callback) { - revid = +revid; - // XXX: TODO: Filter out all but the revision we want - this._collection.find({ id: quote.getId() }, { limit: 1 }, function (_err, cursor) { - cursor.toArray(function (_err, data) { - // was the quote found? - if ((data.length === 0) - || (data[0].revisions.length < (revid + 1))) { - callback(null); - return; - } - // return the quote data - callback(data[0].revisions[revid]); - }); - }); - }; - MongoServerDao.prototype.setWorksheets = function (qid, data, callback) { - this._collection.update({ id: qid }, { '$set': { worksheets: { data: data } } }, - // create record if it does not yet exist - { upsert: true }, - // on complete - function (err) { - callback(err); - return; - }); - }; - MongoServerDao.prototype.getWorksheet = function (qid, supplier, index, callback) { - this._collection.find({ id: qid }, { limit: 1 }, function (_err, cursor) { - cursor.toArray(function (_err, data) { - // was the quote found? - if ((data.length === 0) - || (!data[0].worksheets) - || (!data[0].worksheets.data) - || (!data[0].worksheets.data[supplier])) { - callback(null); - return; - } - // return the quote data - callback(data[0].worksheets.data[supplier][index]); - }); - }); - }; - return MongoServerDao; -}(EventEmitter)); -exports.MongoServerDao = MongoServerDao; -; -//# sourceMappingURL=data:application/json;base64,{"version":3,"file":"MongoServerDao.js","sourceRoot":"","sources":["MongoServerDao.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;;;;;;;;;;GAmBG;;;;;;;;;;;;;;;AASH,IAAM,YAAY,GAAG,OAAO,CAAE,QAAQ,CAAE,CAAC,YAAY,CAAC;AAItD;;GAEG;AACH;IAAoC,kCAAY;IAyB5C;;;;OAIG;IACH,wBACqB,GAAQ;QAD7B,YAII,iBAAO,SACV;QAJoB,SAAG,GAAH,GAAG,CAAK;QA7B7B,sCAAsC;QAC7B,gBAAU,GAAW,QAAQ,CAAC;QAEvC,2CAA2C;QAClC,oBAAc,GAAW,KAAK,CAAC;QAExC,iCAAiC;QACxB,kBAAY,GAAW,SAAS,CAAC;QAE1C,+BAA+B;QACtB,0BAAoB,GAAW,MAAM,CAAC;QAG/C,0DAA0D;QAClD,YAAM,GAAY,KAAK,CAAC;;IAmBhC,CAAC;IAGD;;;;;;;;;OASG;IACH,6BAAI,GAAJ,UAAM,QAAkB;QAEpB,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,qEAAqE;QACrE,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,OAAO,EAAE,UAAU,GAAQ;YAEpC,GAAG,CAAC,MAAM,GAAQ,KAAK,CAAC;YACxB,GAAG,CAAC,WAAW,GAAG,IAAI,CAAC;YAEvB,GAAG,CAAC,IAAI,CAAE,cAAc,EAAE,GAAG,CAAE,CAAC;QACpC,CAAC,CAAC,CAAC;QAEH,IAAI,CAAC,OAAO,CAAE,QAAQ,CAAE,CAAC;QACzB,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;;;OASG;IACH,gCAAO,GAAP,UAAS,QAAkB;QAEvB,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,qCAAqC;QACrC,IAAI,CAAC,GAAG,CAAC,IAAI,CAAE,UAAU,GAAQ,EAAE,EAAO;YAEtC,yDAAyD;YACzD,IAAK,GAAG,EACR;gBACI,8DAA8D;gBAC9D,8DAA8D;gBAC9D,UAAU;gBACV,IAAK,GAAG,CAAC,KAAK,KAAK,SAAS,EAC5B;oBACI,GAAG,CAAC,IAAI,CAAE,cAAc,EAAE,GAAG,CAAE,CAAC;oBAChC,OAAO;iBACV;aACJ;YAED,IAAI,WAAW,GAAG,CAAC,CAAC;YACpB,IAAI,WAAW,GAAG;gBAEd,IAAK,EAAE,WAAW,GAAG,CAAC,EACtB;oBACI,OAAO;iBACV;gBAED,uBAAuB;gBACvB,GAAG,CAAC,MAAM,GAAG,IAAI,CAAC;gBAClB,GAAG,CAAC,IAAI,CAAE,OAAO,CAAE,CAAC;gBAEpB,+CAA+C;gBAC/C,IAAK,QAAQ,YAAY,QAAQ,EACjC;oBACI,QAAQ,CAAC,IAAI,CAAE,GAAG,CAAE,CAAC;iBACxB;YACL,CAAC,CAAA;YAED,oBAAoB;YACpB,EAAE,CAAC,UAAU,CACT,GAAG,CAAC,UAAU,EACd,UACI,IAAe,EACf,UAA2B;gBAE3B,kDAAkD;gBAClD,IAAK,UAAU,IAAI,IAAI,EACvB;oBACI,OAAO;iBACV;gBAED,qBAAqB;gBACrB,UAAU,CAAC,WAAW,CAClB,CAAE,CAAC,IAAI,EAAE,CAAC,CAAC,CAAE,EACb,IAAI,EACJ,UAAU,IAAS,EAAE,MAA4B;oBAE7C,mCAAmC;oBACnC,GAAG,CAAC,WAAW,GAAG,UAAU,CAAC;oBAC7B,WAAW,EAAE,CAAC;gBAClB,CAAC,CACJ,CAAC;YACN,CAAC,CACJ,CAAC;YAEF,iBAAiB;YACjB,EAAE,CAAC,UAAU,CACT,GAAG,CAAC,cAAc,EAClB,UACI,GAAe,EACf,UAA2B;gBAE3B,IAAK,GAAG,EACR;oBACI,GAAG,CAAC,IAAI,CAAE,UAAU,EAAE,GAAG,CAAE,CAAC;oBAC5B,OAAO;iBACV;gBAED,IAAK,UAAU,IAAI,IAAI,EACvB;oBACI,OAAO;iBACV;gBAED,GAAG,CAAC,cAAc,GAAG,UAAU,CAAC;gBAEhC,0DAA0D;gBAC1D,UAAU,CAAC,IAAI,CACX,EAAE,GAAG,EAAE,GAAG,CAAC,YAAY,EAAE,EACzB,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,GAAQ,EAAE,MAAM;oBAEtB,IAAK,GAAG,EACR;wBACI,GAAG,CAAC,eAAe,CAAE,WAAW,CAAE,CAAA;wBAClC,OAAO;qBACV;oBAED,MAAM,CAAC,OAAO,CAAE,UAAU,IAAS,EAAE,IAAW;wBAE5C,IAAK,IAAI,CAAC,MAAM,IAAI,CAAC,EACrB;4BACI,GAAG,CAAC,eAAe,CAAE,WAAW,CAAE,CAAC;4BACnC,OAAO;yBACV;wBAED,WAAW,EAAE,CAAC;oBAClB,CAAC,CAAC,CAAC;gBACP,CAAC,CACJ,CAAC;YACN,CAAC,CACJ,CAAC;QACN,CAAC,CAAC,CAAC;QAEH,OAAO,IAAI,CAAC;IAChB,CAAC;IAGO,wCAAe,GAAvB,UAAyB,QAAoB;QAEzC,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,IAAI,CAAC,cAAe,CAAC,MAAM,CACvB;YACI,GAAG,EAAE,IAAI,CAAC,YAAY;YACtB,GAAG,EAAE,IAAI,CAAC,oBAAoB;SACjC,EACD,UAAU,GAAQ,EAAE,KAAU;YAE1B,IAAK,GAAG,EACR;gBACI,GAAG,CAAC,IAAI,CAAE,UAAU,EAAE,GAAG,CAAE,CAAC;gBAC5B,OAAO;aACV;YAED,GAAG,CAAC,IAAI,CAAE,SAAS,EAAE,GAAG,CAAC,YAAY,CAAE,CAAC;YACxC,QAAQ,CAAC,IAAI,CAAE,GAAG,CAAE,CAAC;QACzB,CAAC,CACJ,CAAC;IACN,CAAC;IAGD;;;;;;;;;;;;;OAaG;IACH,kCAAS,GAAT,UACI,KAAiC,EACjC,gBAA0B,EAC1B,gBAA0B,EAC1B,SAAqB,EACrB,SAAqB;QAGrB,IAAI,GAAG,GAAyB,IAAI,CAAC;QACrC,IAAI,IAAI,GAAwB,EAAE,CAAC;QAEnC,oDAAoD;QACpD,IAAK,IAAI,CAAC,MAAM,KAAK,KAAK,EAC1B;YACI,IAAI,CAAC,IAAI,CAAE,gBAAgB,EACvB,EAAE,OAAO,EAAE,2BAA2B,EAAE,EACxC,KAAK,CAAE,oBAAoB,CAAE,EAC7B,KAAK,CACR,CAAC;YAEF,gBAAgB,CAAC,IAAI,CAAE,IAAI,EAAE,KAAK,CAAE,CAAC;YACrC,OAAO,GAAG,CAAC;SACd;QAED,IAAK,SAAS,KAAK,SAAS,EAC5B;YACI,SAAS,GAAG;gBACR,IAAI,EAAE,KAAK,CAAC,SAAS,EAAE,CAAC,OAAO,EAAE;aACpC,CAAC;YAEF,sCAAsC;YACtC,IAAI,GAAG,KAAK,CAAC,aAAa,EAAE,CAAC,OAAO,EAAE,CAAC;SAC1C;QAED,IAAI,EAAE,GAAG,KAAK,CAAC,KAAK,EAAE,CAAC;QAEvB,wEAAwE;QACxE,wBAAwB;QACxB,SAAS,CAAC,EAAE,GAAmB,EAAE,CAAC;QAClC,SAAS,CAAC,IAAI,GAAiB,KAAK,CAAC,iBAAiB,EAAE,CAAC;QACzD,SAAS,CAAC,WAAW,GAAU,CAAC,CAAC;QACjC,SAAS,CAAC,YAAY,GAAS,KAAK,CAAC,kBAAkB,EAAE,CAAC;QAC1D,SAAS,CAAC,gBAAgB,GAAK,KAAK,CAAC,YAAY,EAAE,CAAC;QACpD,SAAS,CAAC,YAAY,GAAS,KAAK,CAAC,qBAAqB,EAAE,CAAC;QAC7D,SAAS,CAAC,kBAAkB,GAAG,KAAK,CAAC,mBAAmB,EAAE,CAAC;QAC3D,SAAS,CAAC,WAAW,GAAU,CAAC,KAAK,CAAC,UAAU,EAAE,CAAC;QACnD,SAAS,CAAC,QAAQ,GAAa,CAAC,KAAK,CAAC,OAAO,EAAE,CAAC;QAChD,SAAS,CAAC,UAAU,GAAW,IAAI,CAAC,KAAK,CACrC,CAAE,IAAI,IAAI,EAAE,CAAE,CAAC,OAAO,EAAE,GAAG,IAAI,CAClC,CAAC;QAEF,4DAA4D;QAC5D,IAAI,CAAC,4BAA4B,GAAG,CAAE,KAAK,CAAC,YAAY,EAAE,CAAE,CAAC;QAE7D,yDAAyD;QACzD,SAAS,CAAC,MAAM,GAAG,CAAE,IAAI,KAAK,EAAE,CAAE,CAAC,KAAK,CAAC;QAEzC,qEAAqE;QACrE,MAAM,CAAC,IAAI,CAAE,IAAI,CAAE,CAAC,OAAO,CACvB,UAAA,GAAG,IAAI,OAAA,SAAS,CAAE,OAAO,GAAG,GAAG,CAAE,GAAG,IAAI,CAAE,GAAG,CAAE,EAAxC,CAAwC,CAClD,CAAC;QAEF,4BAA4B;QAC5B,IAAM,QAAQ,GAAG,CAAE,CAAC,SAAS,IAAI,CAAC,MAAM,CAAC,IAAI,CAAE,SAAS,CAAE,CAAC,MAAM,CAAE;YAC/D,CAAC,CAAC,EAAE,MAAM,EAAE,SAAS,EAAE;YACvB,CAAC,CAAC,EAAE,MAAM,EAAE,SAAS,EAAE,OAAO,EAAE,SAAS,EAAE,CAAC;QAEhD,kEAAkE;QAClE,YAAY;QACZ,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,EAAE,EAAE,EAChC,QAAQ;QAER,yCAAyC;QACzC,EAAE,MAAM,EAAE,IAAI,EAAE;QAEhB,cAAc;QACd,UAAU,GAAG,EAAE,KAAK;YAEhB,gDAAgD;YAChD,IAAK,GAAG,EACR;gBACI,GAAG,CAAC,IAAI,CAAE,gBAAgB,EAAE,GAAG,EAAE,KAAK,CAAE,CAAC;gBAEzC,kCAAkC;gBAClC,IAAK,gBAAgB,YAAY,QAAQ,EACzC;oBACI,gBAAgB,CAAC,IAAI,CAAE,GAAG,EAAE,KAAK,CAAE,CAAC;iBACvC;gBAED,OAAO;aACV;YAED,aAAa;YACb,IAAK,gBAAgB,YAAY,QAAQ,EACzC;gBACI,gBAAgB,CAAC,IAAI,CAAE,GAAG,EAAE,KAAK,CAAE,CAAC;aACvC;QACL,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;OAOG;IACH,kCAAS,GAAT,UACI,KAA0B,EAC1B,IAAsB,EACtB,SAAmB,EACnB,SAAmB;QAGnB,mEAAmE;QACnE,IAAI,MAAM,GAAG,IAAI,CAAC;QAElB,yDAAyD;QACzD,IAAI,KAAK,GAAG,IAAI,CAAC;QACjB,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,KAAK,CAAC,KAAK,EAAE,EAAE,EAC3C,EAAE,MAAM,EAAE,MAAM,EAAE,EAClB,EAAE,EAEF,UAAU,GAAG,EAAE,KAAK;YAEhB,IAAK,GAAG,EACR;gBACI,KAAK,CAAC,IAAI,CAAE,gBAAgB,EAAE,GAAG,EAAE,KAAK,CAAE,CAAC;gBAE3C,IAAK,OAAO,SAAS,KAAK,UAAU,EACpC;oBACI,SAAS,CAAE,KAAK,CAAE,CAAC;iBACtB;gBAED,OAAO;aACV;YAED,IAAK,OAAO,SAAS,KAAK,UAAU,EACpC;gBACI,SAAS,CAAE,KAAK,CAAE,CAAC;aACtB;QACL,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;;;;OAUG;IACH,oCAAW,GAAX,UACI,KAAwB,EACxB,IAAoB,EACpB,OAAiB,EACjB,OAAiB;QAGjB,IAAI,MAAM,GAAgB,EAAE,CAAC;QAE7B,KAAM,IAAI,KAAK,IAAI,IAAI,EACvB;YACI,IAAK,CAAC,KAAK,EACX;gBACI,SAAS;aACZ;YAED,MAAM,CAAE,OAAO,GAAG,KAAK,CAAE,GAAG,IAAI,CAAE,KAAK,CAAE,CAAC;SAC7C;QAED,OAAO,IAAI,CAAC,SAAS,CAAE,KAAK,EAAE,MAAM,EAAE,OAAO,EAAE,OAAO,CAAE,CAAC;IAC7D,CAAC;IAGD;;;;;;;;;;;OAWG;IACH,uCAAc,GAAd,UACI,KAAiC,EACjC,gBAAqB,EACrB,gBAAqB;QAGrB,IAAI,MAAM,GAAG;YACT,aAAa,EAAK,KAAK,CAAC,gBAAgB,EAAE;YAC1C,gBAAgB,EAAE,KAAK,CAAC,mBAAmB,EAAE;YAC7C,cAAc,EAAI,KAAK,CAAC,iBAAiB,EAAE;SAC9C,CAAC;QAEF,OAAO,IAAI,CAAC,SAAS,CACjB,KAAK,EAAE,MAAM,EAAE,gBAAgB,EAAE,gBAAgB,CACpD,CAAC;IACN,CAAC;IAGD,yCAAgB,GAAhB,UACI,KAAwB,EACxB,OAAY,EACZ,OAAY,EACZ,OAAY;QAGZ,OAAO,IAAI,CAAC,SAAS,CACjB,KAAK,EACL,EAAE,SAAS,EAAE,OAAO,EAAE,EACtB,OAAO,EACP,OAAO,CACV,CAAC;IACN,CAAC;IAGD;;;;;;;;;;;;OAYG;IACH,sCAAa,GAAb,UACI,KAAyB,EACzB,QAAa,EACb,OAAkB,EAClB,OAAkB;QAGlB,IAAM,MAAM,GAAgB,EAAE,CAAC;QAE/B,KAAM,IAAI,GAAG,IAAI,QAAQ,EACzB;YACI,IAAI,IAAI,GAAG,QAAQ,CAAE,GAAG,CAAE,CAAC;YAE3B,KAAM,IAAI,CAAC,IAAI,IAAI,EACnB;gBACI,MAAM,CAAE,OAAO,GAAG,GAAG,GAAG,GAAG,GAAG,CAAC,CAAE,GAAG,QAAQ,CAAE,GAAG,CAAE,CAAE,CAAC,CAAE,CAAC;aAC5D;SACJ;QAED,IAAI,CAAC,SAAS,CAAE,KAAK,EAAE,MAAM,EAAE,OAAO,EAAE,OAAO,CAAE,CAAC;IACtD,CAAC;IAGD;;;;;;;;OAQG;IACH,2CAAkB,GAAlB,UACI,KAAiC,EACjC,gBAA0B,EAC1B,gBAA0B;QAG1B,iCAAiC;QACjC,OAAO,IAAI,CAAC,SAAS,CACjB,KAAK,EACL,gBAAgB,EAChB,gBAAgB,EAChB,EAAE,CACL,CAAC;IACN,CAAC;IAGD;;;;;;;OAOG;IACH,kCAAS,GAAT,UACI,QAAyB,EACzB,QAAsD;QAGtD,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,iEAAiE;QACjE,aAAa;QACb,IAAI,CAAC,WAAY,CAAC,IAAI,CAAE,EAAE,EAAE,EAAE,QAAQ,EAAE,EAAE,EAAE,KAAK,EAAmB,CAAC,EAAE,EACnE,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,uBAAuB;gBACvB,IAAK,IAAI,CAAC,MAAM,IAAI,CAAC,EACrB;oBACI,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,IAAI,CAAE,CAAC;oBAC3B,OAAO;iBACV;gBAED,wBAAwB;gBACxB,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,IAAI,CAAE,CAAC,CAAE,CAAE,CAAC;YACpC,CAAC,CAAC,CAAC;QACP,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD,sCAAa,GAAb,UAAe,QAAoC;QAE/C,0CAA0C;QAC1C,QAAQ,CAAC,IAAI,CAAE,IAAI,EAAE,IAAI,CAAC,oBAAoB,CAAE,CAAC;QAEjD,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD,sCAAa,GAAb,UAAe,QAAoC;QAE/C,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,IAAI,CAAC,cAAe,CAAC,IAAI,CACrB,EAAE,GAAG,EAAE,IAAI,CAAC,YAAY,EAAE,EAC1B,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,IAAK,IAAI,CAAC,MAAM,IAAI,CAAC,EACrB;oBACI,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,CAAC,CAAE,CAAC;oBACxB,OAAO;iBACV;gBAED,0BAA0B;gBAC1B,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,IAAI,CAAE,CAAC,CAAE,CAAC,GAAG,CAAE,CAAC;YACxC,CAAC,CAAC,CAAC;QACP,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,uCAAc,GAAd,UAAgB,QAAsC;QAElD,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,IAAI,CAAC,cAAe,CAAC,aAAa,CAC9B,EAAE,GAAG,EAAE,IAAI,CAAC,YAAY,EAAE,EAC1B,CAAE,CAAE,KAAK,EAAE,YAAY,CAAE,CAAE,EAC3B,EAAE,IAAI,EAAE,EAAE,GAAG,EAAE,CAAC,EAAE,EAAE,EACpB,EAAE,KAAK,EAAE,IAAI,EAAE,EAEf,UAAU,GAAG,EAAE,GAAG;YAEd,IAAK,GAAG,EACR;gBACI,GAAG,CAAC,IAAI,CAAE,UAAU,EAAE,GAAG,CAAE,CAAC;gBAE5B,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,CAAC,CAAE,CAAC;gBACxB,OAAO;aACV;YAED,oBAAoB;YACpB,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,GAAG,CAAC,GAAG,CAAE,CAAC;QAClC,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;OAOG;IACH,uCAAc,GAAd,UACI,KAAyB,EACzB,QAAuB;QAGvB,IAAI,KAAK,GAAG,IAAI,EACZ,GAAG,GAAK,KAAK,CAAC,KAAK,EAAE,EACrB,IAAI,GAAI,KAAK,CAAC,SAAS,EAAE,CAAC,OAAO,EAAE,CAAC;QAExC,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,GAAG,EAAE,EACjC,EAAE,OAAO,EAAE,EAAE,SAAS,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,EAAE,EAAE;QAE1C,yCAAyC;QACzC,EAAE,MAAM,EAAE,IAAI,EAAE;QAEhB,cAAc;QACd,UAAU,GAAG;YAET,IAAK,GAAG,EACR;gBACI,KAAK,CAAC,IAAI,CAAE,YAAY,EAAE,GAAG,CAAE,CAAC;aACnC;YAED,QAAQ,CAAE,GAAG,CAAE,CAAC;YAChB,OAAO;QACX,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,oCAAW,GAAX,UACI,KAAyB,EACzB,KAAyB,EACzB,QAAuB;QAGvB,KAAK,GAAoB,CAAC,KAAK,CAAC;QAEhC,qDAAqD;QACrD,IAAI,CAAC,WAAY,CAAC,IAAI,CAClB,EAAE,EAAE,EAAE,KAAK,CAAC,KAAK,EAAE,EAAE,EACrB,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,uBAAuB;gBACvB,IAAK,CAAE,IAAI,CAAC,MAAM,KAAK,CAAC,CAAE;uBACnB,CAAE,IAAI,CAAE,CAAC,CAAE,CAAC,SAAS,CAAC,MAAM,GAAG,CAAE,KAAK,GAAG,CAAC,CAAE,CAAE,EAErD;oBACI,QAAQ,CAAE,IAAI,CAAE,CAAC;oBACjB,OAAO;iBACV;gBAED,wBAAwB;gBACxB,QAAQ,CAAE,IAAI,CAAE,CAAC,CAAE,CAAC,SAAS,CAAE,KAAK,CAAE,CAAE,CAAC;YAC7C,CAAC,CAAC,CAAC;QACP,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,sCAAa,GAAb,UACI,GAAiB,EACjB,IAAqB,EACrB,QAA4B;QAG5B,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,GAAG,EAAE,EACjC,EAAE,MAAM,EAAE,EAAE,UAAU,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,EAAE,EAAE;QAE1C,yCAAyC;QACzC,EAAE,MAAM,EAAE,IAAI,EAAE;QAEhB,cAAc;QACd,UAAU,GAAG;YAET,QAAQ,CAAE,GAAG,CAAE,CAAC;YAChB,OAAO;QACX,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,qCAAY,GAAZ,UACI,GAAiB,EACjB,QAAgB,EAChB,KAAyB,EACzB,QAAgD;QAGhD,IAAI,CAAC,WAAY,CAAC,IAAI,CAClB,EAAE,EAAE,EAAE,GAAG,EAAE,EACX,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,uBAAuB;gBACvB,IAAK,CAAE,IAAI,CAAC,MAAM,KAAK,CAAC,CAAE;uBACnB,CAAE,CAAC,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAE;uBACzB,CAAE,CAAC,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAC,IAAI,CAAE;uBAC9B,CAAE,CAAC,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAC,IAAI,CAAE,QAAQ,CAAE,CAAE,EAEjD;oBACI,QAAQ,CAAE,IAAI,CAAE,CAAC;oBACjB,OAAO;iBACV;gBAED,wBAAwB;gBACxB,QAAQ,CAAE,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAC,IAAI,CAAE,QAAQ,CAAE,CAAE,KAAK,CAAE,CAAE,CAAC;YAC/D,CAAC,CAAE,CAAC;QACR,CAAC,CACJ,CAAC;IACN,CAAC;IACL,qBAAC;AAAD,CAAC,AAhvBD,CAAoC,YAAY,GAgvB/C;AAhvBY,wCAAc;AAgvB1B,CAAC"} \ No newline at end of file diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts new file mode 100644 index 0000000..bfd6dc3 --- /dev/null +++ b/src/system/AmqpPublisher.ts @@ -0,0 +1,42 @@ +/** + * Amqp Publisher + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Publish Amqp message to a queue + */ + +import { DeltaResult } from "../bucket/delta"; +import { Options } from 'amqplib'; + + +export interface AmqpConfig extends Options.Connect { + /** The name of a queue or exchange to publish to */ + exchange: string; +} + + +export interface AmqpPublisher +{ + /** + * Publish quote message to exchange post-rating + * + * @param delta - The delta to publish + */ + publish( delta: DeltaResult ): void; +} diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 6103f20..67e372d 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -23,6 +23,7 @@ import { DeltaDao } from "../system/db/DeltaDao"; import { MongoDeltaType } from "../system/db/MongoDeltaDao"; import { DeltaResult } from "../bucket/delta"; import { DocumentId } from "../document/Document"; +import { AmqpPublisher } from "./AmqpPublisher"; /** @@ -36,12 +37,6 @@ export class DeltaProcessor /** The data delta type */ readonly DELTA_DATA: MongoDeltaType = 'data'; - /** A mapping of which delta type translated to which avro event */ - readonly DELTA_MAP: Record = { - DELTA_RATEDATA: 'rate', - DELTA_DATA: 'update', - }; - /** * Initialize processor @@ -49,7 +44,8 @@ export class DeltaProcessor * @param _collection Mongo collection */ constructor( - private readonly _dao: DeltaDao, + private readonly _dao: DeltaDao, + private readonly _publisher: AmqpPublisher, ) {} @@ -68,9 +64,7 @@ export class DeltaProcessor deltas.forEach( delta => { - // TODO: publish delta - // publisher.publish( delta, self.DELTA_MAP[ delta.type ] ) - console.log( delta, self.DELTA_MAP[ delta.type ] ); + self._publisher.publish( delta ); }); diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts new file mode 100644 index 0000000..2606c56 --- /dev/null +++ b/src/system/DeltaPublisher.ts @@ -0,0 +1,133 @@ +/** + * Delta Publisher + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Publish delta message to a queue + */ + +import { AmqpPublisher } from "./AmqpPublisher"; +import { DeltaResult } from "../bucket/delta"; +import { + connect as amqpConnect, + Options, + Channel +} from 'amqplib'; + + +export interface AmqpConfig extends Options.Connect { + /** The name of a queue or exchange to publish to */ + exchange: string; +} + + +export class DeltaPublisher implements AmqpPublisher +{ + /** A mapping of which delta type translated to which avro event */ + readonly DELTA_MAP: Record = { + data: 'rate', + ratedata: 'update', + }; + + + /** + * Initialize trait + * + * @param {Object} conf AMQP configuration + * @param {DebugLog} logger logger instance + */ + constructor( + private readonly _conf: AmqpConfig, + private readonly _logger: any + ) {} + + + /** + * Publish quote message to exchange post-rating + * + * @param delta - The delta to publish + */ + publish( delta: DeltaResult ): void + { + // check both as we transition from one to the other + const exchange = this._conf.exchange; + + amqpConnect( this._conf ) + .then( conn => + { + setTimeout( () => conn.close(), 10000 ); + return conn.createChannel(); + } ) + .then( ch => { + ch.assertExchange( exchange, 'fanout', { durable: true } ); + + return this._sendMessage( ch, exchange, delta ); + } ) + .then( () => this._logger.log( + this._logger.PRIORITY_INFO, + "Published " + delta.type + " delta with timestamp '" + + delta.timestamp + "' to quote-update exchange '"+ + exchange + "'" + ) ) + .catch( e => this._logger.log( + this._logger.PRIORITY_ERROR, + "Error publishing " + delta.type + " delta with timestamp '" + + delta.timestamp + "' to quote-update exchange '"+ + exchange + "'" + ": " + e + ) ); + } + + + /** + * Send message to exchange + * + * @param channel - AMQP channel + * @param exchange - exchange name + * @param delta - The delta to publish + * + * @return whether publish was successful + */ + _sendMessage( + channel: Channel, + exchange: string, + delta: DeltaResult, + ): boolean + { + const headers = { + version: 1, + created: Date.now(), + }; + + const event_id = this.DELTA_MAP[ delta.type ]; + + const data = new Buffer( JSON.stringify( { + delta: delta, + event: event_id, + } ) ); + + // we don't use a routing key; fanout exchange + const routing_key = ''; + + return channel.publish( + exchange, + routing_key, + data, + { headers: headers }, + ); + } +} diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index 26923ae..3e70282 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -20,6 +20,7 @@ */ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; +import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao'; @@ -163,7 +164,11 @@ describe( 'system.DeltaProcessor', () => }, ] ).forEach( ( { given, expected, label } ) => it( label, () => { - const sut = new Sut( createMockDeltaDao() ); + const sut = new Sut( + createMockDeltaDao(), + createMockDeltaPublisher() + ); + const actual = sut.getTimestampSortedDeltas( given ); expect( actual ).to.deep.equal( expected ); @@ -280,7 +285,11 @@ describe( 'system.DeltaProcessor', () => }, ] ).forEach( ( { type, given, expected, label } ) => it( label, () => { - const sut = new Sut( createMockDeltaDao() ); + const sut = new Sut( + createMockDeltaDao(), + createMockDeltaPublisher() + ); + const actual = sut.getDeltas( given, type ); expect( actual ).to.deep.equal( expected ); @@ -297,3 +306,11 @@ function createMockDeltaDao(): DeltaDao markDocumentAsProcessed() { return this }, }; } + + +function createMockDeltaPublisher(): AmqpPublisher +{ + return { + publish() {}, + }; +} diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts new file mode 100644 index 0000000..9f72cd1 --- /dev/null +++ b/test/system/DeltaPublisherTest.ts @@ -0,0 +1,49 @@ +/** + * Delta publisher test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import { + DeltaPublisher as Sut, + AmqpConfig +} from "../../src/system/DeltaPublisher"; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + + +describe( 'server.DeltaPublisher', () => +{ + describe( '#publish', () => + { + it( 'sends a message', () => + { + const conf = createMockConf(); + + console.log( new Sut( conf, {} ) ); + expect( true ).to.be.true + }); + }); +} ); + + +function createMockConf(): AmqpConfig +{ + return {}; +}