1
0
Fork 0

AmqpConnection: Propagate assertExchange promise

Failures were being ignored.
master
Mike Gerwitz 2019-12-09 11:40:02 -05:00 committed by Austin Schaffer
parent d9ee999adb
commit 9d6cb23e16
4 changed files with 88 additions and 18 deletions

View File

@ -18,6 +18,7 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
import * as amqplib from "amqplib";
import { createAmqpConfig } from '../src/system/AmqpPublisher'; import { createAmqpConfig } from '../src/system/AmqpPublisher';
import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
import { DeltaProcessor } from '../src/system/DeltaProcessor'; import { DeltaProcessor } from '../src/system/DeltaProcessor';
@ -48,7 +49,7 @@ const process_interval_ms = +( process.env.process_interval_ms || 2000 );
const env = process.env.NODE_ENV || 'Unknown Environment'; const env = process.env.NODE_ENV || 'Unknown Environment';
const emitter = new EventEmitter(); const emitter = new EventEmitter();
const log = new StandardLogger( console, ts_ctr, env ); const log = new StandardLogger( console, ts_ctr, env );
const amqp_connection = new AmqpConnection( amqp_conf, emitter ); const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter );
const publisher = new DeltaPublisher( const publisher = new DeltaPublisher(
emitter, emitter,
ts_ctr, ts_ctr,

View File

@ -17,25 +17,23 @@
* *
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Amqp Connection
*/ */
import { AmqpConfig } from '../AmqpPublisher'; import { AmqpConfig } from '../AmqpPublisher';
import { EventEmitter } from "events"; import { EventEmitter } from "events";
import { import * as amqplib from "amqplib";
connect as AmqpConnect,
Channel,
Connection,
} from 'amqplib';
/**
* Connection to AMQP exchange
*
* XXX: Needs tests!
*/
export class AmqpConnection export class AmqpConnection
{ {
/** The amqp connection */ /** The amqp connection */
private _conn?: Connection; private _conn?: amqplib.Connection;
/** The amqp channel */ /** The amqp channel */
private _channel?: Channel; private _channel?: amqplib.Channel;
/** /**
@ -45,6 +43,7 @@ export class AmqpConnection
* @param _emitter - event emitter instance * @param _emitter - event emitter instance
*/ */
constructor( constructor(
private readonly _amqp: typeof amqplib,
private readonly _conf: AmqpConfig, private readonly _conf: AmqpConfig,
private readonly _emitter: EventEmitter, private readonly _emitter: EventEmitter,
) {} ) {}
@ -55,7 +54,7 @@ export class AmqpConnection
*/ */
connect(): Promise<void> connect(): Promise<void>
{ {
return AmqpConnect( this._conf ) return this._amqp.connect( this._conf )
.then( conn => .then( conn =>
{ {
this._conn = conn; this._conn = conn;
@ -72,16 +71,17 @@ export class AmqpConnection
return this._conn.createChannel(); return this._conn.createChannel();
} ) } )
.then( ( ch: Channel ) => .then( ( ch: amqplib.Channel ) =>
{ {
this._channel = ch; this._channel = ch;
this._channel.assertExchange( return this._channel.assertExchange(
this._conf.exchange, this._conf.exchange,
'fanout', 'fanout',
{ durable: true } { durable: true }
); );
} ); } )
.then( _ => {} );
} }
@ -130,7 +130,7 @@ export class AmqpConnection
* *
* @return exchange name * @return exchange name
*/ */
getAmqpChannel(): Channel | undefined getAmqpChannel(): amqplib.Channel | undefined
{ {
if ( !this._channel ) if ( !this._channel )
{ {

View File

@ -0,0 +1,69 @@
/**
* Tests AmqpConnection
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Amqp Connection
*/
import { AmqpConnection as Sut } from "../../../src/system/amqp/AmqpConnection";
import { AmqpConfig } from "../../../src/system/AmqpPublisher";
import { EventEmitter } from "events";
import * as amqplib from "amqplib";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
describe( 'AmqpConnection', () =>
{
describe( '#connect', () =>
{
it( "fails when exchange cannot be asserted", () =>
{
const expected_err = new Error( "test failure" );
const mock_channel = <amqplib.Channel>(<unknown>{
assertExchange() {
return Promise.reject( expected_err );
},
});
const mock_connection = <amqplib.Connection>(<unknown>{
once() {},
createChannel() {
return Promise.resolve( mock_channel );
},
});
const mock_amqp = <typeof amqplib>(<unknown>{
connect() {
return Promise.resolve( mock_connection );
}
});
const emitter = new EventEmitter();
const conf = <AmqpConfig>{};
const sut = new Sut( mock_amqp, conf, emitter );
return expect( sut.connect() )
.to.eventually.be.rejectedWith( expected_err );
} );
} );
} );