diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 522b98b..e8a4a92 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -18,6 +18,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ +import * as amqplib from "amqplib"; import { createAmqpConfig } from '../src/system/AmqpPublisher'; import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; import { DeltaProcessor } from '../src/system/DeltaProcessor'; @@ -48,7 +49,7 @@ const process_interval_ms = +( process.env.process_interval_ms || 2000 ); const env = process.env.NODE_ENV || 'Unknown Environment'; const emitter = new EventEmitter(); const log = new StandardLogger( console, ts_ctr, env ); -const amqp_connection = new AmqpConnection( amqp_conf, emitter ); +const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter ); const publisher = new DeltaPublisher( emitter, ts_ctr, diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index b136f80..7433bd8 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -219,4 +219,4 @@ export class DeltaProcessor return 0; } -} \ No newline at end of file +} diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts index 13b9791..6d50bfc 100644 --- a/src/system/amqp/AmqpConnection.ts +++ b/src/system/amqp/AmqpConnection.ts @@ -17,25 +17,23 @@ * * You should have received a copy of the GNU General Public License * along with this program. If not, see . - * - * Amqp Connection */ import { AmqpConfig } from '../AmqpPublisher'; import { EventEmitter } from "events"; -import { - connect as AmqpConnect, - Channel, - Connection, -} from 'amqplib'; - +import * as amqplib from "amqplib"; +/** + * Connection to AMQP exchange + * + * XXX: Needs tests! + */ export class AmqpConnection { /** The amqp connection */ - private _conn?: Connection; + private _conn?: amqplib.Connection; /** The amqp channel */ - private _channel?: Channel; + private _channel?: amqplib.Channel; /** @@ -45,6 +43,7 @@ export class AmqpConnection * @param _emitter - event emitter instance */ constructor( + private readonly _amqp: typeof amqplib, private readonly _conf: AmqpConfig, private readonly _emitter: EventEmitter, ) {} @@ -55,7 +54,7 @@ export class AmqpConnection */ connect(): Promise { - return AmqpConnect( this._conf ) + return this._amqp.connect( this._conf ) .then( conn => { this._conn = conn; @@ -72,16 +71,17 @@ export class AmqpConnection return this._conn.createChannel(); } ) - .then( ( ch: Channel ) => + .then( ( ch: amqplib.Channel ) => { this._channel = ch; - this._channel.assertExchange( + return this._channel.assertExchange( this._conf.exchange, 'fanout', { durable: true } ); - } ); + } ) + .then( _ => {} ); } @@ -130,7 +130,7 @@ export class AmqpConnection * * @return exchange name */ - getAmqpChannel(): Channel | undefined + getAmqpChannel(): amqplib.Channel | undefined { if ( !this._channel ) { @@ -151,4 +151,4 @@ export class AmqpConnection this._conn.close.bind(this._conn); } } -} \ No newline at end of file +} diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts new file mode 100644 index 0000000..36a332c --- /dev/null +++ b/test/system/amqp/AmqpConnectionTest.ts @@ -0,0 +1,69 @@ +/** + * Tests AmqpConnection + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * Amqp Connection + */ + +import { AmqpConnection as Sut } from "../../../src/system/amqp/AmqpConnection"; +import { AmqpConfig } from "../../../src/system/AmqpPublisher"; +import { EventEmitter } from "events"; +import * as amqplib from "amqplib"; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + +describe( 'AmqpConnection', () => +{ + describe( '#connect', () => + { + it( "fails when exchange cannot be asserted", () => + { + const expected_err = new Error( "test failure" ); + + const mock_channel = ({ + assertExchange() { + return Promise.reject( expected_err ); + }, + }); + + const mock_connection = ({ + once() {}, + + createChannel() { + return Promise.resolve( mock_channel ); + }, + }); + + const mock_amqp = ({ + connect() { + return Promise.resolve( mock_connection ); + } + }); + + const emitter = new EventEmitter(); + const conf = {}; + const sut = new Sut( mock_amqp, conf, emitter ); + + return expect( sut.connect() ) + .to.eventually.be.rejectedWith( expected_err ); + } ); + } ); +} ); +