diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json index da222bb..4f79ad6 100644 --- a/conf/vanilla-server.json +++ b/conf/vanilla-server.json @@ -51,16 +51,28 @@ "domain": "" }, "postRatePublish": { - "protocol": "amqp", - "hostname": "localhost", - "port": 5672, - "username": "", - "password": "", - "locale": "en_US", - "frameMax": 0, - "heartbeat": 0, - "vhost": "/", - "queueName": "postrate" + "protocol": "amqp", + "hostname": "localhost", + "port": 5672, + "username": "", + "password": "", + "locale": "en_US", + "frameMax": 0, + "heartbeat": 0, + "vhost": "/", + "exchange": "postrate" + }, + "deltaPublish": { + "protocol": "amqp", + "hostname": "localhost", + "port": 5672, + "username": "", + "password": "", + "locale": "en_US", + "frameMax": 0, + "heartbeat": 0, + "vhost": "/", + "exchange": "quoteupdate" } }, "c1export": { diff --git a/src/server/request/UserSession.d.ts b/src/server/request/UserSession.d.ts index 01937d7..f7d08ae 100644 --- a/src/server/request/UserSession.d.ts +++ b/src/server/request/UserSession.d.ts @@ -20,6 +20,9 @@ */ +import { PositiveInteger } from "../../numeric"; + + /** * Session management */ @@ -31,4 +34,28 @@ export declare class UserSession * @return true if internal user, otherwise false */ isInternal(): boolean; + + + /** + * Gets the agent id, if available + * + * @return agent id or undefined if unavailable + */ + agentId(): PositiveInteger | undefined + + + /** + * Gets the broker entity id, if available + * + * @return agent entity id or undefined if unavailable + */ + agentEntityId(): PositiveInteger | undefined + + + /** + * Gets the agent name, if available + * + * @return agent name or undefined if unavailable + */ + agentName(): string | undefined } diff --git a/src/server/service/RatingServicePublish.js b/src/server/service/RatingServicePublish.js index ce843f0..2ce2d98 100644 --- a/src/server/service/RatingServicePublish.js +++ b/src/server/service/RatingServicePublish.js @@ -1,3 +1,4 @@ +"use strict"; /** * Publishes message to queue after rating * @@ -18,13 +19,22 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ - -'use strict'; - -const { Interface, Trait } = require( 'easejs' ); -const { RatingService } = require( './RatingService' ); - - +var __extends = (this && this.__extends) || (function () { + var extendStatics = function (d, b) { + extendStatics = Object.setPrototypeOf || + ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || + function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; }; + return extendStatics(d, b); + }; + return function (d, b) { + extendStatics(d, b); + function __() { this.constructor = d; } + d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); + }; +})(); +Object.defineProperty(exports, "__esModule", { value: true }); +var RatingService_1 = require("./RatingService"); +var amqplib_1 = require("amqplib"); /** * Publish message to a queue after rating * @@ -50,35 +60,11 @@ const { RatingService } = require( './RatingService' ); * * See the body of `#_sendMessage' for their values. */ -module.exports = Trait( 'RatingServicePublish' ) - .implement( Interface( { 'postProcessRaterData': [] } ) ) - .extend( -{ - /** - * AMQP library (amqplib API) - * - * @type {amqplib} - */ - 'private _amqp': null, - - /** - * AMQP configuration - * - * This should be the configuration expected by amqplib's #connect. It - * should additionally contain a `queueName' field. - * - * @type {Object} - */ - 'private _conf': {}, - - /** - * Logger - * - * @type {DebugLog} - */ - 'private _log': null, - - +var RatingServicePublish = /** @class */ (function (_super) { + __extends(RatingServicePublish, _super); + function RatingServicePublish() { + return _super !== null && _super.apply(this, arguments) || this; + } /** * Initialize trait * @@ -86,14 +72,13 @@ module.exports = Trait( 'RatingServicePublish' ) * @param {Object} conf AMQP configuration * @param {DebugLog} logger logger instance */ - __mixin( amqp, conf, logger ) - { - this._amqp = amqp; - this._conf = conf; - this._log = logger; - }, - - + RatingServicePublish.prototype.__mixin = function ( + // constructor( + // private readonly _amqp: Connection, + _conf) { + // super(); + this._conf = _conf; + }; /** * Publish quote message to exchange post-rating * @@ -105,44 +90,25 @@ module.exports = Trait( 'RatingServicePublish' ) * * @return {undefined} */ - 'abstract override postProcessRaterData'( - request, data, actions, program, quote - ) - { + RatingServicePublish.prototype.postProcessRaterData = function (request, data, actions, program, quote) { + var _this = this; // check both as we transition from one to the other - const exchange = this._conf.exchange || this._conf.queueName; - - this._amqp.connect( this._conf ) - .then( conn => - { - setTimeout( () => conn.close(), 10000 ); - return conn.createChannel(); - } ) - .then( ch => { - ch.assertExchange( exchange, 'fanout', { durable: true } ); - - return this._sendMessage( - ch, - exchange, - request.getSession(), - quote - ); - } ) - .then( () => this._log.log( - this._log.PRIORITY_INFO, - "Published quote " + quote.getId() + - " to post-rate exchange '" + exchange + "'" - ) ) - .catch( e => this._log.log( - this._log.PRIORITY_ERROR, - "Post-rate exchange publish failure for quote " + - quote.getId() + ": " + e.message - ) ); - - this.__super( request, data, actions, program, quote ); - }, - - + var exchange = this._conf.exchange; + amqplib_1.connect(this._conf) + .then(function (conn) { + setTimeout(function () { return conn.close(); }, 10000); + return conn.createChannel(); + }) + .then(function (ch) { + ch.assertExchange(exchange, 'fanout', { durable: true }); + return _this._sendMessage(ch, exchange, request.getSession(), quote); + }) + .then(function () { return _this._logger.log(_this._logger.PRIORITY_INFO, "Published quote " + quote.getId() + + " to post-rate exchange '" + exchange + "'"); }) + .catch(function (e) { return _this._logger.log(_this._logger.PRIORITY_ERROR, "Post-rate exchange publish failure for quote " + + quote.getId() + ": " + e.message); }); + _super.prototype.postProcessRaterData.call(this, request, data, actions, program, quote); + }; /** * Send message to exchange * @@ -151,31 +117,25 @@ module.exports = Trait( 'RatingServicePublish' ) * @param {UserSession} session user session * @param {Quote} quote rated quote * - * @return {Promise} whether publish was successful + * @return whether publish was successful */ - 'private _sendMessage'( channel, exchange, session, quote ) - { - const headers = { + RatingServicePublish.prototype._sendMessage = function (channel, exchange, session, quote) { + var headers = { version: 1, created: Date.now(), }; - - const data = new Buffer( JSON.stringify( { - quote_id: quote.getId(), - program_id: quote.getProgramId(), - agent_id: session.agentId(), - entity_id: session.agentEntityId(), + var data = new Buffer(JSON.stringify({ + quote_id: quote.getId(), + program_id: quote.getProgramId(), + agent_id: session.agentId(), + entity_id: session.agentEntityId(), entity_name: session.agentName(), - } ) ); - + })); // we don't use a routing key; fanout exchange - const routing_key = ''; - - return channel.publish( - exchange, - routing_key, - data, - { headers: headers } - ); - }, -} ); + var routing_key = ''; + return channel.publish(exchange, routing_key, data, { headers: headers }); + }; + return RatingServicePublish; +}(RatingService_1.RatingService)); +exports.RatingServicePublish = RatingServicePublish; +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiUmF0aW5nU2VydmljZVB1Ymxpc2guanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyJSYXRpbmdTZXJ2aWNlUHVibGlzaC50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiO0FBQUE7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7R0FtQkc7Ozs7Ozs7Ozs7Ozs7OztBQUdILGlEQUFnRDtBQVFoRCxtQ0FJaUI7QUFRakI7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7OztHQXdCRztBQUNIO0lBQTBDLHdDQUFhO0lBQXZEOztJQTJIQSxDQUFDO0lBOUdHOzs7Ozs7T0FNRztJQUNILHNDQUFPLEdBQVA7SUFDQSxlQUFlO0lBQ1gsd0NBQXdDO0lBQ3hDLEtBQW1CO1FBRW5CLFdBQVc7UUFDWCxJQUFJLENBQUMsS0FBSyxHQUFHLEtBQUssQ0FBQztJQUN2QixDQUFDO0lBR0Q7Ozs7Ozs7Ozs7TUFVRTtJQUNPLG1EQUFvQixHQUE5QixVQUNLLE9BQW9CLEVBQ3BCLElBQW1CLEVBQ25CLE9BQXNCLEVBQ3RCLE9BQWdCLEVBQ2hCLEtBQXdCO1FBTDdCLGlCQXVDRTtRQS9CRyxvREFBb0Q7UUFDcEQsSUFBTSxRQUFRLEdBQUcsSUFBSSxDQUFDLEtBQUssQ0FBQyxRQUFRLENBQUM7UUFFckMsaUJBQVcsQ0FBRSxJQUFJLENBQUMsS0FBSyxDQUFFO2FBQ3BCLElBQUksQ0FBRSxVQUFBLElBQUk7WUFFUCxVQUFVLENBQUUsY0FBTSxPQUFBLElBQUksQ0FBQyxLQUFLLEVBQUUsRUFBWixDQUFZLEVBQUUsS0FBSyxDQUFFLENBQUM7WUFDeEMsT0FBTyxJQUFJLENBQUMsYUFBYSxFQUFFLENBQUM7UUFDaEMsQ0FBQyxDQUFFO2FBQ0YsSUFBSSxDQUFFLFVBQUEsRUFBRTtZQUNMLEVBQUUsQ0FBQyxjQUFjLENBQUUsUUFBUSxFQUFFLFFBQVEsRUFBRSxFQUFFLE9BQU8sRUFBRSxJQUFJLEVBQUUsQ0FBRSxDQUFDO1lBRTNELE9BQU8sS0FBSSxDQUFDLFlBQVksQ0FDcEIsRUFBRSxFQUNGLFFBQVEsRUFDUixPQUFPLENBQUMsVUFBVSxFQUFFLEVBQ3BCLEtBQUssQ0FDUixDQUFDO1FBQ04sQ0FBQyxDQUFFO2FBQ0YsSUFBSSxDQUFFLGNBQU0sT0FBQSxLQUFJLENBQUMsT0FBTyxDQUFDLEdBQUcsQ0FDekIsS0FBSSxDQUFDLE9BQU8sQ0FBQyxhQUFhLEVBQzFCLGtCQUFrQixHQUFHLEtBQUssQ0FBQyxLQUFLLEVBQUU7WUFDOUIsMEJBQTBCLEdBQUcsUUFBUSxHQUFHLEdBQUcsQ0FDbEQsRUFKWSxDQUlaLENBQUU7YUFDRixLQUFLLENBQUUsVUFBQSxDQUFDLElBQUksT0FBQSxLQUFJLENBQUMsT0FBTyxDQUFDLEdBQUcsQ0FDekIsS0FBSSxDQUFDLE9BQU8sQ0FBQyxjQUFjLEVBQzNCLCtDQUErQztZQUMzQyxLQUFLLENBQUMsS0FBSyxFQUFFLEdBQUcsSUFBSSxHQUFHLENBQUMsQ0FBQyxPQUFPLENBQ3ZDLEVBSlksQ0FJWixDQUFFLENBQUM7UUFFUixpQkFBTSxvQkFBb0IsWUFBRSxPQUFPLEVBQUUsSUFBSSxFQUFFLE9BQU8sRUFBRSxPQUFPLEVBQUUsS0FBSyxDQUFFLENBQUM7SUFDekUsQ0FBQztJQUdEOzs7Ozs7Ozs7T0FTRztJQUNILDJDQUFZLEdBQVosVUFDSSxPQUFpQixFQUNqQixRQUFnQixFQUNoQixPQUFxQixFQUNyQixLQUF5QjtRQUd6QixJQUFNLE9BQU8sR0FBRztZQUNaLE9BQU8sRUFBRSxDQUFDO1lBQ1YsT0FBTyxFQUFFLElBQUksQ0FBQyxHQUFHLEVBQUU7U0FDdEIsQ0FBQztRQUVGLElBQU0sSUFBSSxHQUFHLElBQUksTUFBTSxDQUFFLElBQUksQ0FBQyxTQUFTLENBQUU7WUFDckMsUUFBUSxFQUFLLEtBQUssQ0FBQyxLQUFLLEVBQUU7WUFDMUIsVUFBVSxFQUFHLEtBQUssQ0FBQyxZQUFZLEVBQUU7WUFDakMsUUFBUSxFQUFLLE9BQU8sQ0FBQyxPQUFPLEVBQUU7WUFDOUIsU0FBUyxFQUFJLE9BQU8sQ0FBQyxhQUFhLEVBQUU7WUFDcEMsV0FBVyxFQUFFLE9BQU8sQ0FBQyxTQUFTLEVBQUU7U0FDbkMsQ0FBRSxDQUFFLENBQUM7UUFFTiw4Q0FBOEM7UUFDOUMsSUFBTSxXQUFXLEdBQUcsRUFBRSxDQUFDO1FBRXZCLE9BQU8sT0FBTyxDQUFDLE9BQU8sQ0FDbEIsUUFBUSxFQUNSLFdBQVcsRUFDWCxJQUFJLEVBQ0osRUFBRSxPQUFPLEVBQUUsT0FBTyxFQUFFLENBQ3ZCLENBQUM7SUFDTixDQUFDO0lBQ0wsMkJBQUM7QUFBRCxDQUFDLEFBM0hELENBQTBDLDZCQUFhLEdBMkh0RDtBQTNIWSxvREFBb0IifQ== \ No newline at end of file diff --git a/src/server/service/RatingServicePublish.ts b/src/server/service/RatingServicePublish.ts new file mode 100644 index 0000000..1966759 --- /dev/null +++ b/src/server/service/RatingServicePublish.ts @@ -0,0 +1,191 @@ +/** + * Publishes message to queue after rating + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +import { RatingService } from "./RatingService"; +import { UserRequest } from "../request/UserRequest"; +import { RateResult } from "../rater/Rater"; +import { ClientActions } from "../../client/action/ClientAction"; +import { Program } from "../../program/Program"; +import { ServerSideQuote } from "../quote/ServerSideQuote"; +import { UserSession } from "../request/UserSession"; + +import { + connect as amqpConnect, + Options, + Channel, +} from 'amqplib'; + + +export interface AmqpConfig extends Options.Connect { + /** The name of a queue or exchange to publish to */ + exchange: string; +} + +/** + * Publish message to a queue after rating + * + * This is an initial proof-of-concept implementation. In particular, we + * have the following considerations: + * + * - A fresh connection is made for each message until we can ensure that + * we can auto-reconnect on failure; + * - This trait is not yet tested; + * - It does not use an exchange; + * - It does a poor job checking for and reporting errors. + * + * The message consists of a `version' header that is set to 1. Future + * changes to the message format will bump this version. There is also a + * `created' header holding a Unix timestamp of the moment that the message + * was created. + * + * Version 1 of the body consists of four fields: + * - quote_id + * - agent_id + * - entity_id + * - entity_name + * + * See the body of `#_sendMessage' for their values. + */ +export class RatingServicePublish extends RatingService +{ + /** + * AMQP configuration + * + * This should be the configuration expected by amqplib's #connect. It + * should additionally contain a `queueName' field. + * + * @type {Object} + */ + private _conf: AmqpConfig; + + + /** + * Initialize trait + * + * @param {amqplib} AMQP library + * @param {Object} conf AMQP configuration + * @param {DebugLog} logger logger instance + */ + __mixin( + // constructor( + // private readonly _amqp: Connection, + _conf: AmqpConfig, + ) { + // super(); + this._conf = _conf; + } + + + /** + * Publish quote message to exchange post-rating + * + * @param {UserRequest} request user request + * @param {Object} data rating data returned + * @param {Array} actions actions to send to client + * @param {Program} program program used to perform rating + * @param {Quote} quote quote used for rating + * + * @return {undefined} + */ + protected postProcessRaterData( + request: UserRequest, + data: RateResult, + actions: ClientActions, + program: Program, + quote: ServerSideQuote, + ): void + { + // check both as we transition from one to the other + const exchange = this._conf.exchange; + + amqpConnect( this._conf ) + .then( conn => + { + setTimeout( () => conn.close(), 10000 ); + return conn.createChannel(); + } ) + .then( ch => { + ch.assertExchange( exchange, 'fanout', { durable: true } ); + + return this._sendMessage( + ch, + exchange, + request.getSession(), + quote + ); + } ) + .then( () => this._logger.log( + this._logger.PRIORITY_INFO, + "Published quote " + quote.getId() + + " to post-rate exchange '" + exchange + "'" + ) ) + .catch( e => this._logger.log( + this._logger.PRIORITY_ERROR, + "Post-rate exchange publish failure for quote " + + quote.getId() + ": " + e.message + ) ); + + super.postProcessRaterData( request, data, actions, program, quote ); + } + + + /** + * Send message to exchange + * + * @param {Channel} channel AMQP channel + * @param {string} exchange exchange name + * @param {UserSession} session user session + * @param {Quote} quote rated quote + * + * @return whether publish was successful + */ + _sendMessage( + channel: Channel, + exchange: string, + session: UserSession, + quote: ServerSideQuote + ): boolean + { + const headers = { + version: 1, + created: Date.now(), + }; + + const data = new Buffer( JSON.stringify( { + quote_id: quote.getId(), + program_id: quote.getProgramId(), + agent_id: session.agentId(), + entity_id: session.agentEntityId(), + entity_name: session.agentName(), + } ) ); + + // we don't use a routing key; fanout exchange + const routing_key = ''; + + return channel.publish( + exchange, + routing_key, + data, + { headers: headers } + ); + } +} diff --git a/test/bucket/delta.ts b/test/bucket/delta.ts index ba1d192..a729f29 100644 --- a/test/bucket/delta.ts +++ b/test/bucket/delta.ts @@ -89,6 +89,96 @@ describe( 'Delta', () => dest_data: { foo: [ '' ] }, expected: {}, }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'change', 'baz' ] }, + data: { foo: [ 'change', 'baz' ] }, + diff: { foo: [ 'change' ] }, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', 'change' ] }, + data: { foo: [ 'bar', 'change' ] }, + diff: { foo: [ , 'change' ] }, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ undefined, 'change' ] }, + data: { foo: [ 'bar', 'change' ] }, + diff: { foo: [ , 'change' ] }, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ undefined, 'baz' ] }, + data: { foo: [ 'bar', 'baz' ] }, + diff: {}, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', undefined ] }, + data: { foo: [ 'bar', 'baz' ] }, + diff: {}, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', null ] }, + data: { foo: [ 'bar' ] }, + diff: { foo: [ , null ] }, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', 'baz', null ] }, + data: { foo: [ 'bar', 'baz' ] }, + diff: {}, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', 'baz', 'quux' ] }, + data: { foo: [ 'bar', 'baz', 'quux' ] }, + diff: { foo: [ , , 'quux' ]}, + }, + { + label: "", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [] }, + data: { foo: [ 'bar', 'baz' ] }, + diff: {}, + }, + + // null not at end of set means unchanged + { + label: "", + src_data: { foo: [ 'bar', 'baz', 'quux' ] }, + dest_data: { foo: [ null, null, 'quux' ] }, + data: { foo: [ 'bar', 'baz', 'quux' ] }, + diff: {}, + }, + // but the last one is + { + label: "", + src_data: { foo: [ 'bar', 'baz', 'quux' ] }, + dest_data: { foo: [ null, 'baz', null ] }, + data: { foo: [ 'bar', 'baz' ] }, + diff: { foo: [ , , null ] }, + }, + // given a string of nulls, only the last one is terminating; the + // rest are interpreted as undefined (because JSON serializes + // undefined values to `null' -_-) + { + label: "", + src_data: { foo: [ 'bar', 'baz', 'quux' ] }, + dest_data: { foo: [ null, null, null ] }, + data: { foo: [ 'bar', 'baz' ] }, + diff: { foo: [ , , null ] }, + }, ] ).forEach( ( { label, src_data, dest_data, expected } ) => { it( label, () => diff --git a/test/server/request/DataProcessorTest.ts b/test/server/request/DataProcessorTest.ts index d0e52d8..a3a4d0e 100644 --- a/test/server/request/DataProcessorTest.ts +++ b/test/server/request/DataProcessorTest.ts @@ -503,7 +503,7 @@ function createStubs( function createStubUserRequest( internal: boolean ) { - return { + return { getSession: () => ( { isInternal: () => internal } )