From 4d22107a57d0941efeba48d1d1f99d086c1c85c8 Mon Sep 17 00:00:00 2001 From: Mike Gerwitz Date: Thu, 6 Jul 2017 14:21:06 -0400 Subject: [PATCH] Create ProcessManager out of rater/service * src/server/daemon/Daemon.js (__construct): Reference new class. * src/server/rater/ProcessManager.js: Add class (extracted from service). * src/server/rater/service.js: Refactored into ProcessManager. --- src/server/daemon/Daemon.js | 2 +- src/server/rater/ProcessManager.js | 290 +++++++++++++++++++++++++++++ src/server/rater/service.js | 261 -------------------------- 3 files changed, 291 insertions(+), 262 deletions(-) create mode 100644 src/server/rater/ProcessManager.js delete mode 100644 src/server/rater/service.js diff --git a/src/server/daemon/Daemon.js b/src/server/daemon/Daemon.js index 7e13f95..f361a22 100644 --- a/src/server/daemon/Daemon.js +++ b/src/server/daemon/Daemon.js @@ -100,7 +100,7 @@ module.exports = AbstractClass( 'Daemon', { this._httpPort = http_port; - this._rater = liza.server.rater.service; + this._rater = liza.server.rater.ProcessManager(); this._httpServer = this.getHttpServer(); this._accessLog = this._createAccessLog(); this._debugLog = this._createDebugLog( log_priority ); diff --git a/src/server/rater/ProcessManager.js b/src/server/rater/ProcessManager.js new file mode 100644 index 0000000..dab4b4c --- /dev/null +++ b/src/server/rater/ProcessManager.js @@ -0,0 +1,290 @@ +/** + * Rating process manager + * + * Copyright (C) 2017 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * @todo decouple insurance terminology + */ + +'use strict'; + +const { Class } = require( 'easejs' ); +const child_process = require( 'child_process' ); + +// POSIX signal numbers +const _signum = { + SIGHUP: 1, + SIGINT: 2, + SIGQUIT: 3, + SIGILL: 4, + SIGTRAP: 5, + SIGABRT: 6, + SIGIOT: 6, + SIGBUS: 7, + SIGFPE: 8, + SIGKILL: 9, + SIGUSR1: 10, + SIGSEGV: 11, + SIGUSR2: 12, + SIGPIPE: 13, + SIGALRM: 14, + SIGTERM: 15, + SIGSTKFLT: 16, + SIGCHLD: 17, + SIGCONT: 18, + SIGSTOP: 19, + SIGTSTP: 20, + SIGTTIN: 21, + SIGTTOU: 22, + SIGURG: 23, + SIGXCPU: 24, + SIGXFSZ: 25, + SIGVTALARM: 26, + SIGPROF: 27, + SIGWINCH: 28, + SIGIO: 29, + SIGPOLL: 29, + SIGPWR: 30, + SIGSYS: 31, +}; + + +/** + * Manage rating process + * + * Handles formatting and sending requests to the rating process; and + * processing replies. + */ +module.exports = Class( 'ProcessManager', +{ + /** + * Pending requests indexed by id + * @type {Object} + */ + 'private _requests': {}, + + /** + * Rating process + * @type {ChildProcess} + */ + 'private _child': null, + + + /** + * Start rating process + * + * @param {function(string)} logc logging callback + * @param {function(string,string)} errc error callback (message and stack) + * + * @return {undefined} + */ + 'public init'( logc, errc ) + { + if ( this._child !== null ) + { + // end the child + this._child.kill( 'SIGHUP' ); + return; + } + + this._child = child_process.fork( + __dirname + '/process.js', + [], + + // pass all our arguments to the child, incrementing the debug + // port if --debug was provided + { + env: process.env, + execArgv: process.execArgv.map( arg => + { + const debug = arg.match( /^--debug(?:=(\d+))?$/ ); + + if ( debug === null ) + { + return arg; + } + + // our debug port will be our parent's plus one + const parent_port = +debug[ 1 ] || 5858, + new_port = parent_port + 1; + + return '--debug=' + new_port; + } ), + } + ); + + this._child.on( 'message', msg => + { + var cmd = msg.cmd; + + switch ( cmd ) + { + case 'log': + logc( msg.msg ); + break; + + case 'error': + errc( msg.msg, msg.stack ); + break; + + case 'rate-reply': + this._rateReply( msg, errc ); + break; + + default: + errc( "Unknown message from rater process: " + msg.cmd ); + } + } ); + + this._child.on( 'exit', ( excode, sig ) => + { + this._child = null; + + // c'mon node...use the POSIX exit status + if ( ( excode == null ) && sig ) + { + excode = this._getSignum( sig ); + } + + if ( excode !== 0 ) + { + errc( + "Rater process exited with error code " + excode + + ( ( sig ) ? " (" + sig + ")" : '' ) + ); + + this._purgeRateRequests( "Rater process died unexpectedly" ); + } + else + { + logc( "Rater process exited gracefully." ); + } + + // purge anything remaining in the queue (hopefully nothing; this is + // purely a catch-all in case a request somehow snuck in due to a bug) + this._purgeRateRequests( "Rater process is being restarted." ); + + // start a new process + logc( "Restarting rater process..." ); + exports.init( logc, errc ); + } ); + }, + + + /** + * Returns the rater associated with the given id + * + * @param {string} id rater id + * + * @return {Object|null} requested rater or null if it does not exist + */ + 'public byId'( id ) + { + // temporary until refactoring is complete + return { rate: ( quote, session, indv, success, failure ) => + { + var rqid = this._createRqid( quote.getId(), indv ); + this._requests[ rqid ] = [ success, failure ]; + + this._child.send( { + cmd: 'rate', + supplier: id, + indv: indv, + rqid: rqid, + + quote: { + id: quote.getId(), + agentId: quote.getAgentId(), + agentName: quote.getAgentName(), + data: quote.getBucket().getData(), + + creditScoreRef: quote.getCreditScoreRef(), + }, + + agentId: session.agentId(), + internal: session.isInternal(), + } ); + } }; + }, + + + 'private _rateReply'( msg, errc ) + { + var rqid = msg.rqid, + rq = this._requests[ rqid ]; + + // does this request exits? + if ( !rq ) + { + // uh...beaver? + errc( "Reply to unknown rqid: " + rqid ); + return; + } + + // remove the rqid from the pending request list; we now hold the only + // reference + delete this._requests[ rqid ]; + + var success = rq[ 0 ], + failure = rq[ 1 ]; + + // if we did not rate succesfully, abort + if ( msg.status !== 'ok' ) + { + failure( msg.msg ); + return; + } + + if ( !msg.data ) + { + failure( "Rater indicated success, but no data was returned" ); + return; + } + + // that's right; who da man (or wo-man)? + success( msg.data, ( msg.actions || [] ) ); + }, + + + 'private _purgeRateRequests'( msg ) + { + // this is never a good thing + for ( var rqid in this._requests ) + { + // invoke failure function + this._requests[ rqid ][ 1 ]( msg ); + } + + // clear out request references + this._requests = {}; + }, + + + 'private _getSignum'( sig ) + { + return 128 + ( _signum[ sig ] || 0 ); + }, + + + 'private _createRqid'( qid, indv ) + { + return qid + '_' + + ( indv ? indv + '_' : '' ) + + ( new Date() ).getTime(); + }, +} ); diff --git a/src/server/rater/service.js b/src/server/rater/service.js deleted file mode 100644 index cb36943..0000000 --- a/src/server/rater/service.js +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Rating service - * - * Copyright (C) 2017 R-T Specialty, LLC. - * - * This file is part of the Liza Data Collection Framework. - * - * liza is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * @todo decouple insurance terminology - */ - -var child_process = require( 'child_process' ), - child = null; - -// POSIX signal numbers -const _signum = { - SIGHUP: 1, - SIGINT: 2, - SIGQUIT: 3, - SIGILL: 4, - SIGTRAP: 5, - SIGABRT: 6, - SIGIOT: 6, - SIGBUS: 7, - SIGFPE: 8, - SIGKILL: 9, - SIGUSR1: 10, - SIGSEGV: 11, - SIGUSR2: 12, - SIGPIPE: 13, - SIGALRM: 14, - SIGTERM: 15, - SIGSTKFLT: 16, - SIGCHLD: 17, - SIGCONT: 18, - SIGSTOP: 19, - SIGTSTP: 20, - SIGTTIN: 21, - SIGTTOU: 22, - SIGURG: 23, - SIGXCPU: 24, - SIGXFSZ: 25, - SIGVTALARM: 26, - SIGPROF: 27, - SIGWINCH: 28, - SIGIO: 29, - SIGPOLL: 29, - SIGPWR: 30, - SIGSYS: 31, -}; - - -exports.init = function( logc, errc ) -{ - if ( child !== null ) - { - // end the child - child.kill( 'SIGHUP' ); - return; - } - - child = child_process.fork( - __dirname + '/process.js', - [], - - // pass all our arguments to the child, incrementing the debug - // port if --debug was provided - { - env: process.env, - execArgv: process.execArgv.map( arg => - { - const debug = arg.match( /^--debug(?:=(\d+))?$/ ); - - if ( debug === null ) - { - return arg; - } - - // our debug port will be our parent's plus one - const parent_port = +debug[ 1 ] || 5858, - new_port = parent_port + 1; - - return '--debug=' + new_port; - } ), - } - ); - - child.on( 'message', function( msg ) - { - var cmd = msg.cmd; - - switch ( cmd ) - { - case 'log': - logc( msg.msg ); - break; - - case 'error': - errc( msg.msg, msg.stack ); - break; - - case 'rate-reply': - rateReply( msg, errc ); - break; - - default: - errc( "Unknown message from rater process: " + msg.cmd ); - } - } ); - - child.on( 'exit', function( excode, sig ) - { - child = null; - - // c'mon node...use the POSIX exit status - if ( ( excode == null ) && sig ) - { - excode = getsignum( sig ); - } - - if ( excode !== 0 ) - { - errc( - "Rater process exited with error code " + excode + - ( ( sig ) ? " (" + sig + ")" : '' ) - ); - - purgeRateRequests( "Rater process died unexpectedly" ); - } - else - { - logc( "Rater process exited gracefully." ); - } - - // purge anything remaining in the queue (hopefully nothing; this is - // purely a catch-all in case a request somehow snuck in due to a bug) - purgeRateRequests( "Rater process is being restarted." ); - - // start a new process - logc( "Restarting rater process..." ); - exports.init( logc, errc ); - } ); -}; - - -var _requests = {}; - -/** - * Returns the rater associated with the given id - * - * @param {string} id rater id - * - * @return {Object|null} requested rater or null if it does not exist - */ -exports.byId = function( id ) -{ - // temporary until refactoring is complete - return { rate: function( quote, session, indv, success, failure ) - { - var rqid = createRqid( quote.getId(), indv ); - _requests[ rqid ] = [ success, failure ]; - - child.send( { - cmd: 'rate', - supplier: id, - indv: indv, - rqid: rqid, - - quote: { - id: quote.getId(), - agentId: quote.getAgentId(), - agentName: quote.getAgentName(), - data: quote.getBucket().getData(), - - creditScoreRef: quote.getCreditScoreRef(), - }, - - agentId: session.agentId(), - internal: session.isInternal(), - } ); - } }; -} - - -function rateReply( msg, errc ) -{ - var rqid = msg.rqid, - rq = _requests[ rqid ]; - - // does this request exits? - if ( !rq ) - { - // uh...beaver? - errc( "Reply to unknown rqid: " + rqid ); - return; - } - - // remove the rqid from the pending request list; we now hold the only - // reference - delete _requests[ rqid ]; - - var success = rq[ 0 ], - failure = rq[ 1 ]; - - // if we did not rate succesfully, abort - if ( msg.status !== 'ok' ) - { - failure( msg.msg ); - return; - } - - if ( !msg.data ) - { - failure( "Rater indicated success, but no data was returned" ); - return; - } - - // that's right; who da man (or wo-man)? - success( msg.data, ( msg.actions || [] ) ); -} - - -function purgeRateRequests( msg ) -{ - // this is never a good thing - for ( var rqid in _requests ) - { - // invoke failure function - _requests[ rqid ][ 1 ]( msg ); - } - - // clear out request references - _requests = {}; -} - - -function getsignum( sig ) -{ - return 128 + ( _signum[ sig ] || 0 ); -} - - -function createRqid( qid, indv ) -{ - return qid + '_' - + ( indv ? indv + '_' : '' ) - + ( new Date() ).getTime(); -} -