diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json index 452dc4c..53d14e4 100644 --- a/conf/vanilla-server.json +++ b/conf/vanilla-server.json @@ -50,7 +50,19 @@ "host": "localhost", "domain": "" }, - "noResultsUrl": "" + "noResultsUrl": "", + "postRatePublish": { + "protocol": "amqp", + "hostname": "localhost", + "port": 5672, + "username": "", + "password": "", + "locale": "en_US", + "frameMax": 0, + "heartbeat": 0, + "vhost": "/", + "queueName": "postrate" + } }, "c1export": { "host": "localhost", diff --git a/src/server/daemon/Daemon.js b/src/server/daemon/Daemon.js index b9e2895..306fe58 100644 --- a/src/server/daemon/Daemon.js +++ b/src/server/daemon/Daemon.js @@ -114,7 +114,8 @@ module.exports = AbstractClass( 'Daemon', this._createAccessLog(), this._conf.get( 'skey' ), this._conf.get( 'services.rating.noResultsUrl' ), - ] ).then( ([ debug_log, access_log, skey, no_results_url ]) => + this._conf.get( 'services.rating.postRatePublish' ), + ] ).then( ([ debug_log, access_log, skey, no_results_url, post_rate ]) => { this._debugLog = debug_log; this._accessLog = access_log; @@ -123,7 +124,21 @@ module.exports = AbstractClass( 'Daemon', this._rater = liza.server.rater.ProcessManager(); this._encService = this.getEncryptionService(); this._memcache = this.getMemcacheClient(); - this._routers = this.getRouters( skey, no_results_url ); + + post_rate.reduce( + ( accum, value, key ) => + { + accum[ key ] = value; + return accum; + }, + {} + ).then( post_rate_publish => + this._routers = this.getRouters( + skey, + no_results_url, + post_rate_publish + ) + ); } ) .then( () => this._startDaemon() ); }, @@ -190,18 +205,24 @@ module.exports = AbstractClass( 'Daemon', * all-submit notification URL NO_RESULTS_URL if they are provided, * respectively. * - * @param {string=} skey session key - * @param {no_results_url=} no_results_url URL for all-submit notification + * @param {string=} skey session key + * @param {string=} no_results_url URL for all-submit notification + * @param {Object=} post_rate_publish configuration for post-rate messages * * @return {Object} controller */ - 'protected getProgramController': function( skey, no_results_url ) + 'protected getProgramController': function( + skey, no_results_url, post_rate_publish + ) { var controller = require( './controller' ); controller.rater = this._rater; controller.no_results_url = no_results_url || controller.no_results_url; + controller.post_rate_publish = + post_rate_publish || controller.post_rate_publish; + if ( skey ) { controller.skey = skey; @@ -291,10 +312,14 @@ module.exports = AbstractClass( 'Daemon', 'abstract protected getEncryptionService': [], - 'protected getRouters': function( skey, no_results_url ) + 'protected getRouters': function( + skey, no_results_url, post_rate_publish + ) { return [ - this.getProgramController( skey, no_results_url ), + this.getProgramController( + skey, no_results_url, post_rate_publish + ), this.getScriptsController(), this.getClientErrorController(), ]; diff --git a/src/server/daemon/controller.js b/src/server/daemon/controller.js index 4ec8b49..52f10b8 100644 --- a/src/server/daemon/controller.js +++ b/src/server/daemon/controller.js @@ -83,6 +83,7 @@ const { }, RatingService, + RatingServicePublish, RatingServiceSubmitNotify, TokenedService, TokenDao, @@ -98,6 +99,8 @@ const { store, } = require( '../..' ); +const amqplib = require( 'amqplib' ); + // read and write locks, as separate semaphores var rlock = Semaphore(), @@ -108,9 +111,10 @@ var sflag = {}; // TODO: kluge to get liza somewhat decoupled from lovullo (rating module) -exports.rater = {}; -exports.skey = ""; -exports.no_results_url = ""; +exports.rater = {}; +exports.skey = ""; +exports.no_results_url = ""; +exports.post_rate_publish = {}; exports.init = function( logger, enc_service, conf ) @@ -161,7 +165,10 @@ exports.init = function( logger, enc_service, conf ) ) : RatingService; - rating_service = RatingServiceBase( + // TODO: temporary proof-of-concept + rating_service = RatingServiceBase.use( + RatingServicePublish( amqplib, exports.post_rate_publish ) + )( logger, dao, server, exports.rater ); diff --git a/src/server/service/RatingServicePublish.js b/src/server/service/RatingServicePublish.js new file mode 100644 index 0000000..fa0ac14 --- /dev/null +++ b/src/server/service/RatingServicePublish.js @@ -0,0 +1,143 @@ +/** + * Publishes message to queue after rating + * + * Copyright (C) 2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +'use strict'; + +const { Trait } = require( 'easejs' ); +const RatingService = require( './RatingService' ); + + +/** + * Publish message to a queue after rating + * + * This is an initial proof-of-concept implementation. In particular, we + * have the following considerations: + * + * - A fresh connection is made for each message until we can ensure that + * we can auto-reconnect on failure; + * - This trait is not yet tested; + * - It does not use an exchange; + * - It does a poor job checking for and reporting errors. + * + * The message consists of a `version' header that is set to 1. Future + * changes to the message format will bump this version. There is also a + * `created' header holding a Unix timestamp of the moment that the message + * was created. + * + * Version 1 of the body consists of four fields: + * - quote_id + * - agent_id + * - entity_id + * - entity_name + * + * See the body of `#_sendMessage' for their values. + */ +module.exports = Trait( 'RatingServicePublish' ) + .extend( RatingService, +{ + /** + * AMQP library (amqplib API) + * + * @type {amqplib} + */ + 'private _amqp': null, + + /** + * AMQP configuration + * + * This should be the configuration expected by amqplib's #connect. It + * should additionally contain a `queueName' field. + * + * @type {Object} + */ + 'private _conf': {}, + + + __mixin( amqp, conf ) + { + this._amqp = amqp; + this._conf = conf; + }, + + + /** + * Queue message post rating + * + * @param {UserRequest} request user request + * @param {Object} data rating data returned + * @param {Array} actions actions to send to client + * @param {Program} program program used to perform rating + * @param {Quote} quote quote used for rating + * + * @return {undefined} + */ + 'override protected postProcessRaterData'( + request, data, actions, program, quote + ) + { + const queue = this._conf.queueName; + + let connection = null; + + this._amqp.connect( this._conf ) + .then( conn => connection = conn.createChannel() ) + .then( ch => { + ch.assertQueue( queue, { durable: true } ); + + return this._sendMessage( + ch, + queue, + request.getSession(), + quote + ); + } ); + }, + + + /** + * Send message to queue + * + * @param {Channel} channel AMQP channel + * @param {string} queue queue name + * @param {UserSession} session user session + * @param {Quote} quote rated quote + * + * @return {Promise} whether sendToQueue was successful + */ + 'private _sendMessage'( channel, queue, session, quote ) + { + const headers = { + version: 1, + created: Date.now(), + }; + + const data = new Buffer( JSON.stringify( { + quote_id: quote.getId(), + agent_id: session.agentId(), + entity_id: session.agentEntityId(), + entity_name: session.agentName(), + } ) ); + + return Promise.resolve( + channel.sendToQueue( queue, data, { headers: headers } ) + ); + }, +} );