1
0
Fork 0

[DEV-5312] Add interface for amqp publisher and implement a delta publisher

master
Austin Schaffer 2019-11-12 16:41:31 -05:00
parent de94f69e8f
commit c5733d1dff
7 changed files with 249 additions and 495 deletions

View File

@ -38,7 +38,8 @@
"mocha": "5.2.0", "mocha": "5.2.0",
"@types/mocha": "5.2.0", "@types/mocha": "5.2.0",
"sinon": ">=1.17.4", "sinon": ">=1.17.4",
"es6-promise": "~3" "es6-promise": "~3",
"@types/amqplib": "0.5.13"
}, },
"licenses": [ "licenses": [

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,42 @@
/**
* Amqp Publisher
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Publish Amqp message to a queue
*/
import { DeltaResult } from "../bucket/delta";
import { Options } from 'amqplib';
export interface AmqpConfig extends Options.Connect {
/** The name of a queue or exchange to publish to */
exchange: string;
}
export interface AmqpPublisher
{
/**
* Publish quote message to exchange post-rating
*
* @param delta - The delta to publish
*/
publish( delta: DeltaResult<any> ): void;
}

View File

@ -23,6 +23,7 @@ import { DeltaDao } from "../system/db/DeltaDao";
import { MongoDeltaType } from "../system/db/MongoDeltaDao"; import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta"; import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document"; import { DocumentId } from "../document/Document";
import { AmqpPublisher } from "./AmqpPublisher";
/** /**
@ -36,12 +37,6 @@ export class DeltaProcessor
/** The data delta type */ /** The data delta type */
readonly DELTA_DATA: MongoDeltaType = 'data'; readonly DELTA_DATA: MongoDeltaType = 'data';
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
DELTA_RATEDATA: 'rate',
DELTA_DATA: 'update',
};
/** /**
* Initialize processor * Initialize processor
@ -49,7 +44,8 @@ export class DeltaProcessor
* @param _collection Mongo collection * @param _collection Mongo collection
*/ */
constructor( constructor(
private readonly _dao: DeltaDao, private readonly _dao: DeltaDao,
private readonly _publisher: AmqpPublisher,
) {} ) {}
@ -68,9 +64,7 @@ export class DeltaProcessor
deltas.forEach( delta => { deltas.forEach( delta => {
// TODO: publish delta self._publisher.publish( delta );
// publisher.publish( delta, self.DELTA_MAP[ delta.type ] )
console.log( delta, self.DELTA_MAP[ delta.type ] );
}); });

View File

@ -0,0 +1,133 @@
/**
* Delta Publisher
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Publish delta message to a queue
*/
import { AmqpPublisher } from "./AmqpPublisher";
import { DeltaResult } from "../bucket/delta";
import {
connect as amqpConnect,
Options,
Channel
} from 'amqplib';
export interface AmqpConfig extends Options.Connect {
/** The name of a queue or exchange to publish to */
exchange: string;
}
export class DeltaPublisher implements AmqpPublisher
{
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
data: 'rate',
ratedata: 'update',
};
/**
* Initialize trait
*
* @param {Object} conf AMQP configuration
* @param {DebugLog} logger logger instance
*/
constructor(
private readonly _conf: AmqpConfig,
private readonly _logger: any
) {}
/**
* Publish quote message to exchange post-rating
*
* @param delta - The delta to publish
*/
publish( delta: DeltaResult<any> ): void
{
// check both as we transition from one to the other
const exchange = this._conf.exchange;
amqpConnect( this._conf )
.then( conn =>
{
setTimeout( () => conn.close(), 10000 );
return conn.createChannel();
} )
.then( ch => {
ch.assertExchange( exchange, 'fanout', { durable: true } );
return this._sendMessage( ch, exchange, delta );
} )
.then( () => this._logger.log(
this._logger.PRIORITY_INFO,
"Published " + delta.type + " delta with timestamp '" +
delta.timestamp + "' to quote-update exchange '"+
exchange + "'"
) )
.catch( e => this._logger.log(
this._logger.PRIORITY_ERROR,
"Error publishing " + delta.type + " delta with timestamp '" +
delta.timestamp + "' to quote-update exchange '"+
exchange + "'" + ": " + e
) );
}
/**
* Send message to exchange
*
* @param channel - AMQP channel
* @param exchange - exchange name
* @param delta - The delta to publish
*
* @return whether publish was successful
*/
_sendMessage(
channel: Channel,
exchange: string,
delta: DeltaResult<any>,
): boolean
{
const headers = {
version: 1,
created: Date.now(),
};
const event_id = this.DELTA_MAP[ delta.type ];
const data = new Buffer( JSON.stringify( {
delta: delta,
event: event_id,
} ) );
// we don't use a routing key; fanout exchange
const routing_key = '';
return channel.publish(
exchange,
routing_key,
data,
{ headers: headers },
);
}
}

View File

@ -20,6 +20,7 @@
*/ */
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao'; import { DeltaDao } from '../../src/system/db/DeltaDao';
import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao'; import { MongoDeltaType } from '../../src/system/db/MongoDeltaDao';
@ -163,7 +164,11 @@ describe( 'system.DeltaProcessor', () =>
}, },
] ).forEach( ( { given, expected, label } ) => it( label, () => ] ).forEach( ( { given, expected, label } ) => it( label, () =>
{ {
const sut = new Sut( createMockDeltaDao() ); const sut = new Sut(
createMockDeltaDao(),
createMockDeltaPublisher()
);
const actual = sut.getTimestampSortedDeltas( given ); const actual = sut.getTimestampSortedDeltas( given );
expect( actual ).to.deep.equal( expected ); expect( actual ).to.deep.equal( expected );
@ -280,7 +285,11 @@ describe( 'system.DeltaProcessor', () =>
}, },
] ).forEach( ( { type, given, expected, label } ) => it( label, () => ] ).forEach( ( { type, given, expected, label } ) => it( label, () =>
{ {
const sut = new Sut( createMockDeltaDao() ); const sut = new Sut(
createMockDeltaDao(),
createMockDeltaPublisher()
);
const actual = sut.getDeltas( given, type ); const actual = sut.getDeltas( given, type );
expect( actual ).to.deep.equal( expected ); expect( actual ).to.deep.equal( expected );
@ -297,3 +306,11 @@ function createMockDeltaDao(): DeltaDao
markDocumentAsProcessed() { return this }, markDocumentAsProcessed() { return this },
}; };
} }
function createMockDeltaPublisher(): AmqpPublisher
{
return <AmqpPublisher>{
publish() {},
};
}

View File

@ -0,0 +1,49 @@
/**
* Delta publisher test
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import {
DeltaPublisher as Sut,
AmqpConfig
} from "../../src/system/DeltaPublisher";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
describe( 'server.DeltaPublisher', () =>
{
describe( '#publish', () =>
{
it( 'sends a message', () =>
{
const conf = createMockConf();
console.log( new Sut( conf, {} ) );
expect( true ).to.be.true
});
});
} );
function createMockConf(): AmqpConfig
{
return <AmqpConfig>{};
}