diff --git a/src/server/daemon/controller.js b/src/server/daemon/controller.js index 0bcb076..5bc9ce4 100644 --- a/src/server/daemon/controller.js +++ b/src/server/daemon/controller.js @@ -139,7 +139,7 @@ exports.init = function( logger, enc_service, conf ) // TODO: temporary proof-of-concept rating_service = RatingService.use( - RatingServicePublish( amqplib, exports.post_rate_publish ) + RatingServicePublish( amqplib, exports.post_rate_publish, logger ) )( logger, dao, server, exports.rater ); diff --git a/src/server/service/RatingServicePublish.js b/src/server/service/RatingServicePublish.js index 7e1c2e0..44dd151 100644 --- a/src/server/service/RatingServicePublish.js +++ b/src/server/service/RatingServicePublish.js @@ -70,16 +70,31 @@ module.exports = Trait( 'RatingServicePublish' ) */ 'private _conf': {}, + /** + * Logger + * + * @type {DebugLog} + */ + 'private _logger': null, - __mixin( amqp, conf ) + + /** + * Initialize trait + * + * @param {amqplib} AMQP library + * @param {Object} conf AMQP configuration + * @param {DebugLog} logger logger instance + */ + __mixin( amqp, conf, logger ) { - this._amqp = amqp; - this._conf = conf; + this._amqp = amqp; + this._conf = conf; + this._logger = logger; }, /** - * Queue message post rating + * Publish quote message to exchange post-rating * * @param {UserRequest} request user request * @param {Object} data rating data returned @@ -93,7 +108,8 @@ module.exports = Trait( 'RatingServicePublish' ) request, data, actions, program, quote ) { - const queue = this._conf.queueName; + // check both as we transition from one to the other + const exchange = this._conf.exchange || this._conf.queueName; this._amqp.connect( this._conf ) .then( conn => @@ -102,31 +118,41 @@ module.exports = Trait( 'RatingServicePublish' ) return conn.createChannel(); } ) .then( ch => { - ch.assertQueue( queue, { durable: true } ); + ch.assertExchange( exchange, 'fanout', { durable: true } ); return this._sendMessage( ch, - queue, + exchange, request.getSession(), quote ); - } ); + } ) + .then( () => this._logger.log( + this._logger.PRIORITY_INFO, + "Published quote " + quote.getId() + + " to post-rate exchange '" + exchange + "'" + ) ) + .catch( e => this._logger.log( + this._logger.PRIORITY_ERROR, + "Post-rate exchange publish failure for quote " + + quote.getId() + ": " + e.message + ) ); this.__super( request, data, actions, program, quote ); }, /** - * Send message to queue + * Send message to exchange * - * @param {Channel} channel AMQP channel - * @param {string} queue queue name - * @param {UserSession} session user session - * @param {Quote} quote rated quote + * @param {Channel} channel AMQP channel + * @param {string} exchange exchange name + * @param {UserSession} session user session + * @param {Quote} quote rated quote * - * @return {Promise} whether sendToQueue was successful + * @return {Promise} whether publish was successful */ - 'private _sendMessage'( channel, queue, session, quote ) + 'private _sendMessage'( channel, exchange, session, quote ) { const headers = { version: 1, @@ -141,8 +167,14 @@ module.exports = Trait( 'RatingServicePublish' ) entity_name: session.agentName(), } ) ); - return Promise.resolve( - channel.sendToQueue( queue, data, { headers: headers } ) + // we don't use a routing key; fanout exchange + const routing_key = ''; + + return channel.publish( + exchange, + routing_key, + data, + { headers: headers } ); }, } );