1
0
Fork 0

RatingServicePublish: New trait (mostly proof-of-concept)

This is unfortunately not production-ready code, but we need to get
something out there in the meantime.  The RatingServicePublish's docblock
mentions some of the shortcomings, which will be addressed in the near
future.

There is also more documentation to come once we settle on an implementation.

DEV-4400
master
Mike Gerwitz 2019-03-20 16:42:47 -04:00
parent 814b0ff3a0
commit b8801c039f
4 changed files with 199 additions and 12 deletions

View File

@ -50,7 +50,19 @@
"host": "localhost",
"domain": ""
},
"noResultsUrl": ""
"noResultsUrl": "",
"postRatePublish": {
"protocol": "amqp",
"hostname": "localhost",
"port": 5672,
"username": "",
"password": "",
"locale": "en_US",
"frameMax": 0,
"heartbeat": 0,
"vhost": "/",
"queueName": "postrate"
}
},
"c1export": {
"host": "localhost",

View File

@ -114,7 +114,8 @@ module.exports = AbstractClass( 'Daemon',
this._createAccessLog(),
this._conf.get( 'skey' ),
this._conf.get( 'services.rating.noResultsUrl' ),
] ).then( ([ debug_log, access_log, skey, no_results_url ]) =>
this._conf.get( 'services.rating.postRatePublish' ),
] ).then( ([ debug_log, access_log, skey, no_results_url, post_rate ]) =>
{
this._debugLog = debug_log;
this._accessLog = access_log;
@ -123,7 +124,21 @@ module.exports = AbstractClass( 'Daemon',
this._rater = liza.server.rater.ProcessManager();
this._encService = this.getEncryptionService();
this._memcache = this.getMemcacheClient();
this._routers = this.getRouters( skey, no_results_url );
post_rate.reduce(
( accum, value, key ) =>
{
accum[ key ] = value;
return accum;
},
{}
).then( post_rate_publish =>
this._routers = this.getRouters(
skey,
no_results_url,
post_rate_publish
)
);
} )
.then( () => this._startDaemon() );
},
@ -190,18 +205,24 @@ module.exports = AbstractClass( 'Daemon',
* all-submit notification URL NO_RESULTS_URL if they are provided,
* respectively.
*
* @param {string=} skey session key
* @param {no_results_url=} no_results_url URL for all-submit notification
* @param {string=} skey session key
* @param {string=} no_results_url URL for all-submit notification
* @param {Object=} post_rate_publish configuration for post-rate messages
*
* @return {Object} controller
*/
'protected getProgramController': function( skey, no_results_url )
'protected getProgramController': function(
skey, no_results_url, post_rate_publish
)
{
var controller = require( './controller' );
controller.rater = this._rater;
controller.no_results_url = no_results_url || controller.no_results_url;
controller.post_rate_publish =
post_rate_publish || controller.post_rate_publish;
if ( skey )
{
controller.skey = skey;
@ -291,10 +312,14 @@ module.exports = AbstractClass( 'Daemon',
'abstract protected getEncryptionService': [],
'protected getRouters': function( skey, no_results_url )
'protected getRouters': function(
skey, no_results_url, post_rate_publish
)
{
return [
this.getProgramController( skey, no_results_url ),
this.getProgramController(
skey, no_results_url, post_rate_publish
),
this.getScriptsController(),
this.getClientErrorController(),
];

View File

@ -83,6 +83,7 @@ const {
},
RatingService,
RatingServicePublish,
RatingServiceSubmitNotify,
TokenedService,
TokenDao,
@ -98,6 +99,8 @@ const {
store,
} = require( '../..' );
const amqplib = require( 'amqplib' );
// read and write locks, as separate semaphores
var rlock = Semaphore(),
@ -108,9 +111,10 @@ var sflag = {};
// TODO: kluge to get liza somewhat decoupled from lovullo (rating module)
exports.rater = {};
exports.skey = "";
exports.no_results_url = "";
exports.rater = {};
exports.skey = "";
exports.no_results_url = "";
exports.post_rate_publish = {};
exports.init = function( logger, enc_service, conf )
@ -161,7 +165,10 @@ exports.init = function( logger, enc_service, conf )
)
: RatingService;
rating_service = RatingServiceBase(
// TODO: temporary proof-of-concept
rating_service = RatingServiceBase.use(
RatingServicePublish( amqplib, exports.post_rate_publish )
)(
logger, dao, server, exports.rater
);

View File

@ -0,0 +1,143 @@
/**
* Publishes message to queue after rating
*
* Copyright (C) 2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
const { Trait } = require( 'easejs' );
const RatingService = require( './RatingService' );
/**
* Publish message to a queue after rating
*
* This is an initial proof-of-concept implementation. In particular, we
* have the following considerations:
*
* - A fresh connection is made for each message until we can ensure that
* we can auto-reconnect on failure;
* - This trait is not yet tested;
* - It does not use an exchange;
* - It does a poor job checking for and reporting errors.
*
* The message consists of a `version' header that is set to 1. Future
* changes to the message format will bump this version. There is also a
* `created' header holding a Unix timestamp of the moment that the message
* was created.
*
* Version 1 of the body consists of four fields:
* - quote_id
* - agent_id
* - entity_id
* - entity_name
*
* See the body of `#_sendMessage' for their values.
*/
module.exports = Trait( 'RatingServicePublish' )
.extend( RatingService,
{
/**
* AMQP library (amqplib API)
*
* @type {amqplib}
*/
'private _amqp': null,
/**
* AMQP configuration
*
* This should be the configuration expected by amqplib's #connect. It
* should additionally contain a `queueName' field.
*
* @type {Object}
*/
'private _conf': {},
__mixin( amqp, conf )
{
this._amqp = amqp;
this._conf = conf;
},
/**
* Queue message post rating
*
* @param {UserRequest} request user request
* @param {Object} data rating data returned
* @param {Array} actions actions to send to client
* @param {Program} program program used to perform rating
* @param {Quote} quote quote used for rating
*
* @return {undefined}
*/
'override protected postProcessRaterData'(
request, data, actions, program, quote
)
{
const queue = this._conf.queueName;
let connection = null;
this._amqp.connect( this._conf )
.then( conn => connection = conn.createChannel() )
.then( ch => {
ch.assertQueue( queue, { durable: true } );
return this._sendMessage(
ch,
queue,
request.getSession(),
quote
);
} );
},
/**
* Send message to queue
*
* @param {Channel} channel AMQP channel
* @param {string} queue queue name
* @param {UserSession} session user session
* @param {Quote} quote rated quote
*
* @return {Promise} whether sendToQueue was successful
*/
'private _sendMessage'( channel, queue, session, quote )
{
const headers = {
version: 1,
created: Date.now(),
};
const data = new Buffer( JSON.stringify( {
quote_id: quote.getId(),
agent_id: session.agentId(),
entity_id: session.agentEntityId(),
entity_name: session.agentName(),
} ) );
return Promise.resolve(
channel.sendToQueue( queue, data, { headers: headers } )
);
},
} );