1
0
Fork 0

[DEV-5312] Convert most parts of RatingService Publish to TS

master
Austin Schaffer 2019-11-07 17:07:22 -05:00
parent 31a820a34f
commit c2458dee78
6 changed files with 394 additions and 114 deletions

View File

@ -51,16 +51,28 @@
"domain": "" "domain": ""
}, },
"postRatePublish": { "postRatePublish": {
"protocol": "amqp", "protocol": "amqp",
"hostname": "localhost", "hostname": "localhost",
"port": 5672, "port": 5672,
"username": "", "username": "",
"password": "", "password": "",
"locale": "en_US", "locale": "en_US",
"frameMax": 0, "frameMax": 0,
"heartbeat": 0, "heartbeat": 0,
"vhost": "/", "vhost": "/",
"queueName": "postrate" "exchange": "postrate"
},
"deltaPublish": {
"protocol": "amqp",
"hostname": "localhost",
"port": 5672,
"username": "",
"password": "",
"locale": "en_US",
"frameMax": 0,
"heartbeat": 0,
"vhost": "/",
"exchange": "quoteupdate"
} }
}, },
"c1export": { "c1export": {

View File

@ -20,6 +20,9 @@
*/ */
import { PositiveInteger } from "../../numeric";
/** /**
* Session management * Session management
*/ */
@ -31,4 +34,28 @@ export declare class UserSession
* @return true if internal user, otherwise false * @return true if internal user, otherwise false
*/ */
isInternal(): boolean; isInternal(): boolean;
/**
* Gets the agent id, if available
*
* @return agent id or undefined if unavailable
*/
agentId(): PositiveInteger | undefined
/**
* Gets the broker entity id, if available
*
* @return agent entity id or undefined if unavailable
*/
agentEntityId(): PositiveInteger | undefined
/**
* Gets the agent name, if available
*
* @return agent name or undefined if unavailable
*/
agentName(): string | undefined
} }

View File

@ -1,3 +1,4 @@
"use strict";
/** /**
* Publishes message to queue after rating * Publishes message to queue after rating
* *
@ -18,13 +19,22 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
var __extends = (this && this.__extends) || (function () {
'use strict'; var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
const { Interface, Trait } = require( 'easejs' ); ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
const { RatingService } = require( './RatingService' ); function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
return extendStatics(d, b);
};
return function (d, b) {
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
Object.defineProperty(exports, "__esModule", { value: true });
var RatingService_1 = require("./RatingService");
var amqplib_1 = require("amqplib");
/** /**
* Publish message to a queue after rating * Publish message to a queue after rating
* *
@ -50,35 +60,11 @@ const { RatingService } = require( './RatingService' );
* *
* See the body of `#_sendMessage' for their values. * See the body of `#_sendMessage' for their values.
*/ */
module.exports = Trait( 'RatingServicePublish' ) var RatingServicePublish = /** @class */ (function (_super) {
.implement( Interface( { 'postProcessRaterData': [] } ) ) __extends(RatingServicePublish, _super);
.extend( function RatingServicePublish() {
{ return _super !== null && _super.apply(this, arguments) || this;
/** }
* AMQP library (amqplib API)
*
* @type {amqplib}
*/
'private _amqp': null,
/**
* AMQP configuration
*
* This should be the configuration expected by amqplib's #connect. It
* should additionally contain a `queueName' field.
*
* @type {Object}
*/
'private _conf': {},
/**
* Logger
*
* @type {DebugLog}
*/
'private _log': null,
/** /**
* Initialize trait * Initialize trait
* *
@ -86,14 +72,13 @@ module.exports = Trait( 'RatingServicePublish' )
* @param {Object} conf AMQP configuration * @param {Object} conf AMQP configuration
* @param {DebugLog} logger logger instance * @param {DebugLog} logger logger instance
*/ */
__mixin( amqp, conf, logger ) RatingServicePublish.prototype.__mixin = function (
{ // constructor(
this._amqp = amqp; // private readonly _amqp: Connection,
this._conf = conf; _conf) {
this._log = logger; // super();
}, this._conf = _conf;
};
/** /**
* Publish quote message to exchange post-rating * Publish quote message to exchange post-rating
* *
@ -105,44 +90,25 @@ module.exports = Trait( 'RatingServicePublish' )
* *
* @return {undefined} * @return {undefined}
*/ */
'abstract override postProcessRaterData'( RatingServicePublish.prototype.postProcessRaterData = function (request, data, actions, program, quote) {
request, data, actions, program, quote var _this = this;
)
{
// check both as we transition from one to the other // check both as we transition from one to the other
const exchange = this._conf.exchange || this._conf.queueName; var exchange = this._conf.exchange;
amqplib_1.connect(this._conf)
this._amqp.connect( this._conf ) .then(function (conn) {
.then( conn => setTimeout(function () { return conn.close(); }, 10000);
{ return conn.createChannel();
setTimeout( () => conn.close(), 10000 ); })
return conn.createChannel(); .then(function (ch) {
} ) ch.assertExchange(exchange, 'fanout', { durable: true });
.then( ch => { return _this._sendMessage(ch, exchange, request.getSession(), quote);
ch.assertExchange( exchange, 'fanout', { durable: true } ); })
.then(function () { return _this._logger.log(_this._logger.PRIORITY_INFO, "Published quote " + quote.getId() +
return this._sendMessage( " to post-rate exchange '" + exchange + "'"); })
ch, .catch(function (e) { return _this._logger.log(_this._logger.PRIORITY_ERROR, "Post-rate exchange publish failure for quote " +
exchange, quote.getId() + ": " + e.message); });
request.getSession(), _super.prototype.postProcessRaterData.call(this, request, data, actions, program, quote);
quote };
);
} )
.then( () => this._log.log(
this._log.PRIORITY_INFO,
"Published quote " + quote.getId() +
" to post-rate exchange '" + exchange + "'"
) )
.catch( e => this._log.log(
this._log.PRIORITY_ERROR,
"Post-rate exchange publish failure for quote " +
quote.getId() + ": " + e.message
) );
this.__super( request, data, actions, program, quote );
},
/** /**
* Send message to exchange * Send message to exchange
* *
@ -151,31 +117,25 @@ module.exports = Trait( 'RatingServicePublish' )
* @param {UserSession} session user session * @param {UserSession} session user session
* @param {Quote} quote rated quote * @param {Quote} quote rated quote
* *
* @return {Promise} whether publish was successful * @return whether publish was successful
*/ */
'private _sendMessage'( channel, exchange, session, quote ) RatingServicePublish.prototype._sendMessage = function (channel, exchange, session, quote) {
{ var headers = {
const headers = {
version: 1, version: 1,
created: Date.now(), created: Date.now(),
}; };
var data = new Buffer(JSON.stringify({
const data = new Buffer( JSON.stringify( { quote_id: quote.getId(),
quote_id: quote.getId(), program_id: quote.getProgramId(),
program_id: quote.getProgramId(), agent_id: session.agentId(),
agent_id: session.agentId(), entity_id: session.agentEntityId(),
entity_id: session.agentEntityId(),
entity_name: session.agentName(), entity_name: session.agentName(),
} ) ); }));
// we don't use a routing key; fanout exchange // we don't use a routing key; fanout exchange
const routing_key = ''; var routing_key = '';
return channel.publish(exchange, routing_key, data, { headers: headers });
return channel.publish( };
exchange, return RatingServicePublish;
routing_key, }(RatingService_1.RatingService));
data, exports.RatingServicePublish = RatingServicePublish;
{ headers: headers } //# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiUmF0aW5nU2VydmljZVB1Ymxpc2guanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyJSYXRpbmdTZXJ2aWNlUHVibGlzaC50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiO0FBQUE7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7R0FtQkc7Ozs7Ozs7Ozs7Ozs7OztBQUdILGlEQUFnRDtBQVFoRCxtQ0FJaUI7QUFRakI7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7OztHQXdCRztBQUNIO0lBQTBDLHdDQUFhO0lBQXZEOztJQTJIQSxDQUFDO0lBOUdHOzs7Ozs7T0FNRztJQUNILHNDQUFPLEdBQVA7SUFDQSxlQUFlO0lBQ1gsd0NBQXdDO0lBQ3hDLEtBQW1CO1FBRW5CLFdBQVc7UUFDWCxJQUFJLENBQUMsS0FBSyxHQUFHLEtBQUssQ0FBQztJQUN2QixDQUFDO0lBR0Q7Ozs7Ozs7Ozs7TUFVRTtJQUNPLG1EQUFvQixHQUE5QixVQUNLLE9BQW9CLEVBQ3BCLElBQW1CLEVBQ25CLE9BQXNCLEVBQ3RCLE9BQWdCLEVBQ2hCLEtBQXdCO1FBTDdCLGlCQXVDRTtRQS9CRyxvREFBb0Q7UUFDcEQsSUFBTSxRQUFRLEdBQUcsSUFBSSxDQUFDLEtBQUssQ0FBQyxRQUFRLENBQUM7UUFFckMsaUJBQVcsQ0FBRSxJQUFJLENBQUMsS0FBSyxDQUFFO2FBQ3BCLElBQUksQ0FBRSxVQUFBLElBQUk7WUFFUCxVQUFVLENBQUUsY0FBTSxPQUFBLElBQUksQ0FBQyxLQUFLLEVBQUUsRUFBWixDQUFZLEVBQUUsS0FBSyxDQUFFLENBQUM7WUFDeEMsT0FBTyxJQUFJLENBQUMsYUFBYSxFQUFFLENBQUM7UUFDaEMsQ0FBQyxDQUFFO2FBQ0YsSUFBSSxDQUFFLFVBQUEsRUFBRTtZQUNMLEVBQUUsQ0FBQyxjQUFjLENBQUUsUUFBUSxFQUFFLFFBQVEsRUFBRSxFQUFFLE9BQU8sRUFBRSxJQUFJLEVBQUUsQ0FBRSxDQUFDO1lBRTNELE9BQU8sS0FBSSxDQUFDLFlBQVksQ0FDcEIsRUFBRSxFQUNGLFFBQVEsRUFDUixPQUFPLENBQUMsVUFBVSxFQUFFLEVBQ3BCLEtBQUssQ0FDUixDQUFDO1FBQ04sQ0FBQyxDQUFFO2FBQ0YsSUFBSSxDQUFFLGNBQU0sT0FBQSxLQUFJLENBQUMsT0FBTyxDQUFDLEdBQUcsQ0FDekIsS0FBSSxDQUFDLE9BQU8sQ0FBQyxhQUFhLEVBQzFCLGtCQUFrQixHQUFHLEtBQUssQ0FBQyxLQUFLLEVBQUU7WUFDOUIsMEJBQTBCLEdBQUcsUUFBUSxHQUFHLEdBQUcsQ0FDbEQsRUFKWSxDQUlaLENBQUU7YUFDRixLQUFLLENBQUUsVUFBQSxDQUFDLElBQUksT0FBQSxLQUFJLENBQUMsT0FBTyxDQUFDLEdBQUcsQ0FDekIsS0FBSSxDQUFDLE9BQU8sQ0FBQyxjQUFjLEVBQzNCLCtDQUErQztZQUMzQyxLQUFLLENBQUMsS0FBSyxFQUFFLEdBQUcsSUFBSSxHQUFHLENBQUMsQ0FBQyxPQUFPLENBQ3ZDLEVBSlksQ0FJWixDQUFFLENBQUM7UUFFUixpQkFBTSxvQkFBb0IsWUFBRSxPQUFPLEVBQUUsSUFBSSxFQUFFLE9BQU8sRUFBRSxPQUFPLEVBQUUsS0FBSyxDQUFFLENBQUM7SUFDekUsQ0FBQztJQUdEOzs7Ozs7Ozs7T0FTRztJQUNILDJDQUFZLEdBQVosVUFDSSxPQUFpQixFQUNqQixRQUFnQixFQUNoQixPQUFxQixFQUNyQixLQUF5QjtRQUd6QixJQUFNLE9BQU8sR0FBRztZQUNaLE9BQU8sRUFBRSxDQUFDO1lBQ1YsT0FBTyxFQUFFLElBQUksQ0FBQyxHQUFHLEVBQUU7U0FDdEIsQ0FBQztRQUVGLElBQU0sSUFBSSxHQUFHLElBQUksTUFBTSxDQUFFLElBQUksQ0FBQyxTQUFTLENBQUU7WUFDckMsUUFBUSxFQUFLLEtBQUssQ0FBQyxLQUFLLEVBQUU7WUFDMUIsVUFBVSxFQUFHLEtBQUssQ0FBQyxZQUFZLEVBQUU7WUFDakMsUUFBUSxFQUFLLE9BQU8sQ0FBQyxPQUFPLEVBQUU7WUFDOUIsU0FBUyxFQUFJLE9BQU8sQ0FBQyxhQUFhLEVBQUU7WUFDcEMsV0FBVyxFQUFFLE9BQU8sQ0FBQyxTQUFTLEVBQUU7U0FDbkMsQ0FBRSxDQUFFLENBQUM7UUFFTiw4Q0FBOEM7UUFDOUMsSUFBTSxXQUFXLEdBQUcsRUFBRSxDQUFDO1FBRXZCLE9BQU8sT0FBTyxDQUFDLE9BQU8sQ0FDbEIsUUFBUSxFQUNSLFdBQVcsRUFDWCxJQUFJLEVBQ0osRUFBRSxPQUFPLEVBQUUsT0FBTyxFQUFFLENBQ3ZCLENBQUM7SUFDTixDQUFDO0lBQ0wsMkJBQUM7QUFBRCxDQUFDLEFBM0hELENBQTBDLDZCQUFhLEdBMkh0RDtBQTNIWSxvREFBb0IifQ==
);
},
} );

View File

@ -0,0 +1,191 @@
/**
* Publishes message to queue after rating
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { RatingService } from "./RatingService";
import { UserRequest } from "../request/UserRequest";
import { RateResult } from "../rater/Rater";
import { ClientActions } from "../../client/action/ClientAction";
import { Program } from "../../program/Program";
import { ServerSideQuote } from "../quote/ServerSideQuote";
import { UserSession } from "../request/UserSession";
import {
connect as amqpConnect,
Options,
Channel,
} from 'amqplib';
export interface AmqpConfig extends Options.Connect {
/** The name of a queue or exchange to publish to */
exchange: string;
}
/**
* Publish message to a queue after rating
*
* This is an initial proof-of-concept implementation. In particular, we
* have the following considerations:
*
* - A fresh connection is made for each message until we can ensure that
* we can auto-reconnect on failure;
* - This trait is not yet tested;
* - It does not use an exchange;
* - It does a poor job checking for and reporting errors.
*
* The message consists of a `version' header that is set to 1. Future
* changes to the message format will bump this version. There is also a
* `created' header holding a Unix timestamp of the moment that the message
* was created.
*
* Version 1 of the body consists of four fields:
* - quote_id
* - agent_id
* - entity_id
* - entity_name
*
* See the body of `#_sendMessage' for their values.
*/
export class RatingServicePublish extends RatingService
{
/**
* AMQP configuration
*
* This should be the configuration expected by amqplib's #connect. It
* should additionally contain a `queueName' field.
*
* @type {Object}
*/
private _conf: AmqpConfig;
/**
* Initialize trait
*
* @param {amqplib} AMQP library
* @param {Object} conf AMQP configuration
* @param {DebugLog} logger logger instance
*/
__mixin(
// constructor(
// private readonly _amqp: Connection,
_conf: AmqpConfig,
) {
// super();
this._conf = _conf;
}
/**
* Publish quote message to exchange post-rating
*
* @param {UserRequest} request user request
* @param {Object} data rating data returned
* @param {Array} actions actions to send to client
* @param {Program} program program used to perform rating
* @param {Quote} quote quote used for rating
*
* @return {undefined}
*/
protected postProcessRaterData(
request: UserRequest,
data: RateResult,
actions: ClientActions,
program: Program,
quote: ServerSideQuote,
): void
{
// check both as we transition from one to the other
const exchange = this._conf.exchange;
amqpConnect( this._conf )
.then( conn =>
{
setTimeout( () => conn.close(), 10000 );
return conn.createChannel();
} )
.then( ch => {
ch.assertExchange( exchange, 'fanout', { durable: true } );
return this._sendMessage(
ch,
exchange,
request.getSession(),
quote
);
} )
.then( () => this._logger.log(
this._logger.PRIORITY_INFO,
"Published quote " + quote.getId() +
" to post-rate exchange '" + exchange + "'"
) )
.catch( e => this._logger.log(
this._logger.PRIORITY_ERROR,
"Post-rate exchange publish failure for quote " +
quote.getId() + ": " + e.message
) );
super.postProcessRaterData( request, data, actions, program, quote );
}
/**
* Send message to exchange
*
* @param {Channel} channel AMQP channel
* @param {string} exchange exchange name
* @param {UserSession} session user session
* @param {Quote} quote rated quote
*
* @return whether publish was successful
*/
_sendMessage(
channel: Channel,
exchange: string,
session: UserSession,
quote: ServerSideQuote
): boolean
{
const headers = {
version: 1,
created: Date.now(),
};
const data = new Buffer( JSON.stringify( {
quote_id: quote.getId(),
program_id: quote.getProgramId(),
agent_id: session.agentId(),
entity_id: session.agentEntityId(),
entity_name: session.agentName(),
} ) );
// we don't use a routing key; fanout exchange
const routing_key = '';
return channel.publish(
exchange,
routing_key,
data,
{ headers: headers }
);
}
}

View File

@ -89,6 +89,96 @@ describe( 'Delta', () =>
dest_data: { foo: [ '' ] }, dest_data: { foo: [ '' ] },
expected: {}, expected: {},
}, },
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ 'change', 'baz' ] },
data: { foo: [ 'change', 'baz' ] },
diff: { foo: [ 'change' ] },
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ 'bar', 'change' ] },
data: { foo: [ 'bar', 'change' ] },
diff: { foo: [ , 'change' ] },
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ undefined, 'change' ] },
data: { foo: [ 'bar', 'change' ] },
diff: { foo: [ , 'change' ] },
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ undefined, 'baz' ] },
data: { foo: [ 'bar', 'baz' ] },
diff: {},
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ 'bar', undefined ] },
data: { foo: [ 'bar', 'baz' ] },
diff: {},
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ 'bar', null ] },
data: { foo: [ 'bar' ] },
diff: { foo: [ , null ] },
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ 'bar', 'baz', null ] },
data: { foo: [ 'bar', 'baz' ] },
diff: {},
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [ 'bar', 'baz', 'quux' ] },
data: { foo: [ 'bar', 'baz', 'quux' ] },
diff: { foo: [ , , 'quux' ]},
},
{
label: "",
src_data: { foo: [ 'bar', 'baz' ] },
dest_data: { foo: [] },
data: { foo: [ 'bar', 'baz' ] },
diff: {},
},
// null not at end of set means unchanged
{
label: "",
src_data: { foo: [ 'bar', 'baz', 'quux' ] },
dest_data: { foo: [ null, null, 'quux' ] },
data: { foo: [ 'bar', 'baz', 'quux' ] },
diff: {},
},
// but the last one is
{
label: "",
src_data: { foo: [ 'bar', 'baz', 'quux' ] },
dest_data: { foo: [ null, 'baz', null ] },
data: { foo: [ 'bar', 'baz' ] },
diff: { foo: [ , , null ] },
},
// given a string of nulls, only the last one is terminating; the
// rest are interpreted as undefined (because JSON serializes
// undefined values to `null' -_-)
{
label: "",
src_data: { foo: [ 'bar', 'baz', 'quux' ] },
dest_data: { foo: [ null, null, null ] },
data: { foo: [ 'bar', 'baz' ] },
diff: { foo: [ , , null ] },
},
] ).forEach( ( { label, src_data, dest_data, expected } ) => ] ).forEach( ( { label, src_data, dest_data, expected } ) =>
{ {
it( label, () => it( label, () =>

View File

@ -503,7 +503,7 @@ function createStubs(
function createStubUserRequest( internal: boolean ) function createStubUserRequest( internal: boolean )
{ {
return { return <UserRequest>{
getSession: () => ( { getSession: () => ( {
isInternal: () => internal isInternal: () => internal
} ) } )