From b8801c039f91ccc7517498eb2326e04bc6421ea6 Mon Sep 17 00:00:00 2001 From: Mike Gerwitz Date: Wed, 20 Mar 2019 16:42:47 -0400 Subject: [PATCH] RatingServicePublish: New trait (mostly proof-of-concept) This is unfortunately not production-ready code, but we need to get something out there in the meantime. The RatingServicePublish's docblock mentions some of the shortcomings, which will be addressed in the near future. There is also more documentation to come once we settle on an implementation. DEV-4400 --- conf/vanilla-server.json | 14 +- src/server/daemon/Daemon.js | 39 +++++- src/server/daemon/controller.js | 15 ++- src/server/service/RatingServicePublish.js | 143 +++++++++++++++++++++ 4 files changed, 199 insertions(+), 12 deletions(-) create mode 100644 src/server/service/RatingServicePublish.js diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json index 452dc4c..53d14e4 100644 --- a/conf/vanilla-server.json +++ b/conf/vanilla-server.json @@ -50,7 +50,19 @@ "host": "localhost", "domain": "" }, - "noResultsUrl": "" + "noResultsUrl": "", + "postRatePublish": { + "protocol": "amqp", + "hostname": "localhost", + "port": 5672, + "username": "", + "password": "", + "locale": "en_US", + "frameMax": 0, + "heartbeat": 0, + "vhost": "/", + "queueName": "postrate" + } }, "c1export": { "host": "localhost", diff --git a/src/server/daemon/Daemon.js b/src/server/daemon/Daemon.js index b9e2895..306fe58 100644 --- a/src/server/daemon/Daemon.js +++ b/src/server/daemon/Daemon.js @@ -114,7 +114,8 @@ module.exports = AbstractClass( 'Daemon', this._createAccessLog(), this._conf.get( 'skey' ), this._conf.get( 'services.rating.noResultsUrl' ), - ] ).then( ([ debug_log, access_log, skey, no_results_url ]) => + this._conf.get( 'services.rating.postRatePublish' ), + ] ).then( ([ debug_log, access_log, skey, no_results_url, post_rate ]) => { this._debugLog = debug_log; this._accessLog = access_log; @@ -123,7 +124,21 @@ module.exports = AbstractClass( 'Daemon', this._rater = liza.server.rater.ProcessManager(); this._encService = this.getEncryptionService(); this._memcache = this.getMemcacheClient(); - this._routers = this.getRouters( skey, no_results_url ); + + post_rate.reduce( + ( accum, value, key ) => + { + accum[ key ] = value; + return accum; + }, + {} + ).then( post_rate_publish => + this._routers = this.getRouters( + skey, + no_results_url, + post_rate_publish + ) + ); } ) .then( () => this._startDaemon() ); }, @@ -190,18 +205,24 @@ module.exports = AbstractClass( 'Daemon', * all-submit notification URL NO_RESULTS_URL if they are provided, * respectively. * - * @param {string=} skey session key - * @param {no_results_url=} no_results_url URL for all-submit notification + * @param {string=} skey session key + * @param {string=} no_results_url URL for all-submit notification + * @param {Object=} post_rate_publish configuration for post-rate messages * * @return {Object} controller */ - 'protected getProgramController': function( skey, no_results_url ) + 'protected getProgramController': function( + skey, no_results_url, post_rate_publish + ) { var controller = require( './controller' ); controller.rater = this._rater; controller.no_results_url = no_results_url || controller.no_results_url; + controller.post_rate_publish = + post_rate_publish || controller.post_rate_publish; + if ( skey ) { controller.skey = skey; @@ -291,10 +312,14 @@ module.exports = AbstractClass( 'Daemon', 'abstract protected getEncryptionService': [], - 'protected getRouters': function( skey, no_results_url ) + 'protected getRouters': function( + skey, no_results_url, post_rate_publish + ) { return [ - this.getProgramController( skey, no_results_url ), + this.getProgramController( + skey, no_results_url, post_rate_publish + ), this.getScriptsController(), this.getClientErrorController(), ]; diff --git a/src/server/daemon/controller.js b/src/server/daemon/controller.js index 4ec8b49..52f10b8 100644 --- a/src/server/daemon/controller.js +++ b/src/server/daemon/controller.js @@ -83,6 +83,7 @@ const { }, RatingService, + RatingServicePublish, RatingServiceSubmitNotify, TokenedService, TokenDao, @@ -98,6 +99,8 @@ const { store, } = require( '../..' ); +const amqplib = require( 'amqplib' ); + // read and write locks, as separate semaphores var rlock = Semaphore(), @@ -108,9 +111,10 @@ var sflag = {}; // TODO: kluge to get liza somewhat decoupled from lovullo (rating module) -exports.rater = {}; -exports.skey = ""; -exports.no_results_url = ""; +exports.rater = {}; +exports.skey = ""; +exports.no_results_url = ""; +exports.post_rate_publish = {}; exports.init = function( logger, enc_service, conf ) @@ -161,7 +165,10 @@ exports.init = function( logger, enc_service, conf ) ) : RatingService; - rating_service = RatingServiceBase( + // TODO: temporary proof-of-concept + rating_service = RatingServiceBase.use( + RatingServicePublish( amqplib, exports.post_rate_publish ) + )( logger, dao, server, exports.rater ); diff --git a/src/server/service/RatingServicePublish.js b/src/server/service/RatingServicePublish.js new file mode 100644 index 0000000..fa0ac14 --- /dev/null +++ b/src/server/service/RatingServicePublish.js @@ -0,0 +1,143 @@ +/** + * Publishes message to queue after rating + * + * Copyright (C) 2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +'use strict'; + +const { Trait } = require( 'easejs' ); +const RatingService = require( './RatingService' ); + + +/** + * Publish message to a queue after rating + * + * This is an initial proof-of-concept implementation. In particular, we + * have the following considerations: + * + * - A fresh connection is made for each message until we can ensure that + * we can auto-reconnect on failure; + * - This trait is not yet tested; + * - It does not use an exchange; + * - It does a poor job checking for and reporting errors. + * + * The message consists of a `version' header that is set to 1. Future + * changes to the message format will bump this version. There is also a + * `created' header holding a Unix timestamp of the moment that the message + * was created. + * + * Version 1 of the body consists of four fields: + * - quote_id + * - agent_id + * - entity_id + * - entity_name + * + * See the body of `#_sendMessage' for their values. + */ +module.exports = Trait( 'RatingServicePublish' ) + .extend( RatingService, +{ + /** + * AMQP library (amqplib API) + * + * @type {amqplib} + */ + 'private _amqp': null, + + /** + * AMQP configuration + * + * This should be the configuration expected by amqplib's #connect. It + * should additionally contain a `queueName' field. + * + * @type {Object} + */ + 'private _conf': {}, + + + __mixin( amqp, conf ) + { + this._amqp = amqp; + this._conf = conf; + }, + + + /** + * Queue message post rating + * + * @param {UserRequest} request user request + * @param {Object} data rating data returned + * @param {Array} actions actions to send to client + * @param {Program} program program used to perform rating + * @param {Quote} quote quote used for rating + * + * @return {undefined} + */ + 'override protected postProcessRaterData'( + request, data, actions, program, quote + ) + { + const queue = this._conf.queueName; + + let connection = null; + + this._amqp.connect( this._conf ) + .then( conn => connection = conn.createChannel() ) + .then( ch => { + ch.assertQueue( queue, { durable: true } ); + + return this._sendMessage( + ch, + queue, + request.getSession(), + quote + ); + } ); + }, + + + /** + * Send message to queue + * + * @param {Channel} channel AMQP channel + * @param {string} queue queue name + * @param {UserSession} session user session + * @param {Quote} quote rated quote + * + * @return {Promise} whether sendToQueue was successful + */ + 'private _sendMessage'( channel, queue, session, quote ) + { + const headers = { + version: 1, + created: Date.now(), + }; + + const data = new Buffer( JSON.stringify( { + quote_id: quote.getId(), + agent_id: session.agentId(), + entity_id: session.agentEntityId(), + entity_name: session.agentName(), + } ) ); + + return Promise.resolve( + channel.sendToQueue( queue, data, { headers: headers } ) + ); + }, +} );