1
0
Fork 0

Proof-of-concept RatingServicePublish to queue post-rate

master
Mike Gerwitz 2019-03-21 14:08:35 -04:00
commit 13a7f97f8f
4 changed files with 199 additions and 12 deletions

View File

@ -50,7 +50,19 @@
"host": "localhost",
"domain": ""
},
"noResultsUrl": ""
"noResultsUrl": "",
"postRatePublish": {
"protocol": "amqp",
"hostname": "localhost",
"port": 5672,
"username": "",
"password": "",
"locale": "en_US",
"frameMax": 0,
"heartbeat": 0,
"vhost": "/",
"queueName": "postrate"
}
},
"c1export": {
"host": "localhost",

View File

@ -114,7 +114,8 @@ module.exports = AbstractClass( 'Daemon',
this._createAccessLog(),
this._conf.get( 'skey' ),
this._conf.get( 'services.rating.noResultsUrl' ),
] ).then( ([ debug_log, access_log, skey, no_results_url ]) =>
this._conf.get( 'services.rating.postRatePublish' ),
] ).then( ([ debug_log, access_log, skey, no_results_url, post_rate ]) =>
{
this._debugLog = debug_log;
this._accessLog = access_log;
@ -123,7 +124,21 @@ module.exports = AbstractClass( 'Daemon',
this._rater = liza.server.rater.ProcessManager();
this._encService = this.getEncryptionService();
this._memcache = this.getMemcacheClient();
this._routers = this.getRouters( skey, no_results_url );
post_rate.reduce(
( accum, value, key ) =>
{
accum[ key ] = value;
return accum;
},
{}
).then( post_rate_publish =>
this._routers = this.getRouters(
skey,
no_results_url,
post_rate_publish
)
);
} )
.then( () => this._startDaemon() );
},
@ -190,18 +205,24 @@ module.exports = AbstractClass( 'Daemon',
* all-submit notification URL NO_RESULTS_URL if they are provided,
* respectively.
*
* @param {string=} skey session key
* @param {no_results_url=} no_results_url URL for all-submit notification
* @param {string=} skey session key
* @param {string=} no_results_url URL for all-submit notification
* @param {Object=} post_rate_publish configuration for post-rate messages
*
* @return {Object} controller
*/
'protected getProgramController': function( skey, no_results_url )
'protected getProgramController': function(
skey, no_results_url, post_rate_publish
)
{
var controller = require( './controller' );
controller.rater = this._rater;
controller.no_results_url = no_results_url || controller.no_results_url;
controller.post_rate_publish =
post_rate_publish || controller.post_rate_publish;
if ( skey )
{
controller.skey = skey;
@ -291,10 +312,14 @@ module.exports = AbstractClass( 'Daemon',
'abstract protected getEncryptionService': [],
'protected getRouters': function( skey, no_results_url )
'protected getRouters': function(
skey, no_results_url, post_rate_publish
)
{
return [
this.getProgramController( skey, no_results_url ),
this.getProgramController(
skey, no_results_url, post_rate_publish
),
this.getScriptsController(),
this.getClientErrorController(),
];

View File

@ -83,6 +83,7 @@ const {
},
RatingService,
RatingServicePublish,
RatingServiceSubmitNotify,
TokenedService,
TokenDao,
@ -98,6 +99,8 @@ const {
store,
} = require( '../..' );
const amqplib = require( 'amqplib' );
// read and write locks, as separate semaphores
var rlock = Semaphore(),
@ -108,9 +111,10 @@ var sflag = {};
// TODO: kluge to get liza somewhat decoupled from lovullo (rating module)
exports.rater = {};
exports.skey = "";
exports.no_results_url = "";
exports.rater = {};
exports.skey = "";
exports.no_results_url = "";
exports.post_rate_publish = {};
exports.init = function( logger, enc_service, conf )
@ -161,7 +165,10 @@ exports.init = function( logger, enc_service, conf )
)
: RatingService;
rating_service = RatingServiceBase(
// TODO: temporary proof-of-concept
rating_service = RatingServiceBase.use(
RatingServicePublish( amqplib, exports.post_rate_publish )
)(
logger, dao, server, exports.rater
);

View File

@ -0,0 +1,143 @@
/**
* Publishes message to queue after rating
*
* Copyright (C) 2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
const { Trait } = require( 'easejs' );
const RatingService = require( './RatingService' );
/**
* Publish message to a queue after rating
*
* This is an initial proof-of-concept implementation. In particular, we
* have the following considerations:
*
* - A fresh connection is made for each message until we can ensure that
* we can auto-reconnect on failure;
* - This trait is not yet tested;
* - It does not use an exchange;
* - It does a poor job checking for and reporting errors.
*
* The message consists of a `version' header that is set to 1. Future
* changes to the message format will bump this version. There is also a
* `created' header holding a Unix timestamp of the moment that the message
* was created.
*
* Version 1 of the body consists of four fields:
* - quote_id
* - agent_id
* - entity_id
* - entity_name
*
* See the body of `#_sendMessage' for their values.
*/
module.exports = Trait( 'RatingServicePublish' )
.extend( RatingService,
{
/**
* AMQP library (amqplib API)
*
* @type {amqplib}
*/
'private _amqp': null,
/**
* AMQP configuration
*
* This should be the configuration expected by amqplib's #connect. It
* should additionally contain a `queueName' field.
*
* @type {Object}
*/
'private _conf': {},
__mixin( amqp, conf )
{
this._amqp = amqp;
this._conf = conf;
},
/**
* Queue message post rating
*
* @param {UserRequest} request user request
* @param {Object} data rating data returned
* @param {Array} actions actions to send to client
* @param {Program} program program used to perform rating
* @param {Quote} quote quote used for rating
*
* @return {undefined}
*/
'override protected postProcessRaterData'(
request, data, actions, program, quote
)
{
const queue = this._conf.queueName;
let connection = null;
this._amqp.connect( this._conf )
.then( conn => connection = conn.createChannel() )
.then( ch => {
ch.assertQueue( queue, { durable: true } );
return this._sendMessage(
ch,
queue,
request.getSession(),
quote
);
} );
},
/**
* Send message to queue
*
* @param {Channel} channel AMQP channel
* @param {string} queue queue name
* @param {UserSession} session user session
* @param {Quote} quote rated quote
*
* @return {Promise} whether sendToQueue was successful
*/
'private _sendMessage'( channel, queue, session, quote )
{
const headers = {
version: 1,
created: Date.now(),
};
const data = new Buffer( JSON.stringify( {
quote_id: quote.getId(),
agent_id: session.agentId(),
entity_id: session.agentEntityId(),
entity_name: session.agentName(),
} ) );
return Promise.resolve(
channel.sendToQueue( queue, data, { headers: headers } )
);
},
} );