diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index e8a4a92..11af02b 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -39,6 +39,7 @@ import { createPrometheusConfig, } from '../src/system/PrometheusFactory'; import { AmqpConnection } from '../src/system/amqp/AmqpConnection'; +import { parse as avro_parse } from "avro-js"; const amqp_conf = createAmqpConfig( process.env ); @@ -55,6 +56,7 @@ const publisher = new DeltaPublisher( ts_ctr, createAvroEncoder, amqp_connection, + avro_parse( __dirname + '/../src/system/avro/schema.avsc' ), ); // Prometheus Metrics diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index 5def3a9..9b05114 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -30,16 +30,10 @@ import { AmqpError } from '../error/AmqpError'; import { AvroEncoderCtr } from './avro/AvroFactory'; import { AmqpConnection } from './amqp/AmqpConnection'; -import { AvroSchema, parse } from "avro-js"; +import { AvroSchema } from "avro-js"; export class DeltaPublisher implements AmqpPublisher { - /** The avro schema */ - private _schema: AvroSchema; - - /** The path to the avro schema */ - readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc'; - /** A mapping of which delta type translated to which avro event */ readonly DELTA_MAP: Record = { data: 'STEP_SAVE', @@ -58,11 +52,10 @@ export class DeltaPublisher implements AmqpPublisher constructor( private readonly _emitter: EventEmitter, private readonly _ts_ctr: () => UnixTimestamp, - private readonly _encoder_ctr: AvroEncoderCtr, + private readonly _encoder_ctor: AvroEncoderCtr, private readonly _conn: AmqpConnection, - ) { - this._schema = parse( this.SCHEMA_PATH ); - } + private readonly _schema: AvroSchema, + ) {} /** @@ -263,7 +256,7 @@ export class DeltaPublisher implements AmqpPublisher { this._assertValidAvro( this._schema, data ) - const encoder = this._encoder_ctr( this._schema ) + const encoder = this._encoder_ctor( this._schema ) encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) encoder.on('error', ( err: Error ) => { reject( err ); } ) diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index b5dfed1..314c7da 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -28,10 +28,7 @@ import { EventEmitter } from "events"; import { hasContext } from '../../src/error/ContextError'; import { AmqpError } from '../../src/error/AmqpError'; import { Channel } from 'amqplib'; -import { - createAvroEncoder, - AvroEncoderCtr, -} from '../../src/system/avro/AvroFactory'; +import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory'; import { AvroSchema } from "avro-js"; @@ -66,7 +63,20 @@ describe( 'server.DeltaPublisher', () => }; }; - const sut = new Sut( emitter, ts_ctr, createAvroEncoder, conn ); + const stub_schema = ({ + isValid() + { + // TODO: test me + }, + } ); + + const sut = new Sut( + emitter, + ts_ctr, + createMockEncoderCtor( stub_schema ), + conn, + stub_schema, + ); return expect( sut.publish( 123, delta, bucket, ratedata ) @@ -118,8 +128,20 @@ describe( 'server.DeltaPublisher', () => conn.getAmqpChannel = getChannelF; - const result = new Sut( emitter, ts_ctr, createAvroEncoder, conn ) - .publish( doc_id, delta, bucket, ratedata ); + const stub_schema = ({ + isValid() + { + // TODO: test me + }, + } ); + + const result = new Sut( + emitter, + ts_ctr, + createMockEncoderCtor( stub_schema ), + conn, + stub_schema, + ).publish( doc_id, delta, bucket, ratedata ); return Promise.all( [ expect( result ).to.eventually.be.rejectedWith( @@ -225,11 +247,20 @@ describe( 'server.DeltaPublisher', () => const emitter = createMockEventEmitter(); const conn = createMockAmqpConnection(); const data = createMockData( delta_data ); - const sut = new Sut( + + const stub_schema = ({ + isValid() + { + // TODO: test me + }, + } ); + + const sut = new Sut( emitter, ts_ctr, - createAvroEncoder, + createMockEncoderCtor( stub_schema ), conn, + stub_schema ); sut.avroEncode( data ) @@ -384,11 +415,13 @@ describe( 'server.DeltaPublisher', () => const emitter = createMockEventEmitter(); const conn = createMockAmqpConnection(); const avroEncoderCtr = createMockEncoder( encoded ); + const stub_schema = {}; const sut = new Sut( emitter, ts_ctr, avroEncoderCtr, conn, + stub_schema, ); const actual = sut.setDataTypes( delta_data ); @@ -483,3 +516,28 @@ function createMockDelta(): Delta data: >{}, } } + + +function createMockEncoderCtor( stub_schema: AvroSchema ): + ( schema: AvroSchema ) => Duplex +{ + const events = void>>{}; + + const mock_duplex = ({ + on( event_name: string, callback: () => void ) + { + events[ event_name ] = callback; + }, + + end() + { + events.end(); + }, + } ); + + return ( schema: AvroSchema ): Duplex => + { + expect( schema ).to.equal( stub_schema ); + return mock_duplex; + }; +}