1
0
Fork 0

RatingServicePublish: Publish to {queue=>exchange}

This now publishes to a fanout exchange instead of a queue, which allows
consumers to handle their own queue configuration.

This also adds some basic logging that was missing from the first version.

Note that I still don't consider this to be production-quality code; it's
missing tests, and there's still notes that need to be addressed.
master
Mike Gerwitz 2019-04-03 10:42:26 -04:00
parent aedf8ceed8
commit 90802edc03
2 changed files with 50 additions and 18 deletions

View File

@ -139,7 +139,7 @@ exports.init = function( logger, enc_service, conf )
// TODO: temporary proof-of-concept
rating_service = RatingService.use(
RatingServicePublish( amqplib, exports.post_rate_publish )
RatingServicePublish( amqplib, exports.post_rate_publish, logger )
)(
logger, dao, server, exports.rater
);

View File

@ -70,16 +70,31 @@ module.exports = Trait( 'RatingServicePublish' )
*/
'private _conf': {},
/**
* Logger
*
* @type {DebugLog}
*/
'private _logger': null,
__mixin( amqp, conf )
/**
* Initialize trait
*
* @param {amqplib} AMQP library
* @param {Object} conf AMQP configuration
* @param {DebugLog} logger logger instance
*/
__mixin( amqp, conf, logger )
{
this._amqp = amqp;
this._conf = conf;
this._amqp = amqp;
this._conf = conf;
this._logger = logger;
},
/**
* Queue message post rating
* Publish quote message to exchange post-rating
*
* @param {UserRequest} request user request
* @param {Object} data rating data returned
@ -93,7 +108,8 @@ module.exports = Trait( 'RatingServicePublish' )
request, data, actions, program, quote
)
{
const queue = this._conf.queueName;
// check both as we transition from one to the other
const exchange = this._conf.exchange || this._conf.queueName;
this._amqp.connect( this._conf )
.then( conn =>
@ -102,31 +118,41 @@ module.exports = Trait( 'RatingServicePublish' )
return conn.createChannel();
} )
.then( ch => {
ch.assertQueue( queue, { durable: true } );
ch.assertExchange( exchange, 'fanout', { durable: true } );
return this._sendMessage(
ch,
queue,
exchange,
request.getSession(),
quote
);
} );
} )
.then( () => this._logger.log(
this._logger.PRIORITY_INFO,
"Published quote " + quote.getId() +
" to post-rate exchange '" + exchange + "'"
) )
.catch( e => this._logger.log(
this._logger.PRIORITY_ERROR,
"Post-rate exchange publish failure for quote " +
quote.getId() + ": " + e.message
) );
this.__super( request, data, actions, program, quote );
},
/**
* Send message to queue
* Send message to exchange
*
* @param {Channel} channel AMQP channel
* @param {string} queue queue name
* @param {UserSession} session user session
* @param {Quote} quote rated quote
* @param {Channel} channel AMQP channel
* @param {string} exchange exchange name
* @param {UserSession} session user session
* @param {Quote} quote rated quote
*
* @return {Promise} whether sendToQueue was successful
* @return {Promise} whether publish was successful
*/
'private _sendMessage'( channel, queue, session, quote )
'private _sendMessage'( channel, exchange, session, quote )
{
const headers = {
version: 1,
@ -141,8 +167,14 @@ module.exports = Trait( 'RatingServicePublish' )
entity_name: session.agentName(),
} ) );
return Promise.resolve(
channel.sendToQueue( queue, data, { headers: headers } )
// we don't use a routing key; fanout exchange
const routing_key = '';
return channel.publish(
exchange,
routing_key,
data,
{ headers: headers }
);
},
} );