DeltaPublisher: Remove parsing from constructor
parent
4383d15c5a
commit
e885026e0a
|
@ -39,6 +39,7 @@ import {
|
|||
createPrometheusConfig,
|
||||
} from '../src/system/PrometheusFactory';
|
||||
import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
|
||||
import { parse as avro_parse } from "avro-js";
|
||||
|
||||
|
||||
const amqp_conf = createAmqpConfig( process.env );
|
||||
|
@ -55,6 +56,7 @@ const publisher = new DeltaPublisher(
|
|||
ts_ctr,
|
||||
createAvroEncoder,
|
||||
amqp_connection,
|
||||
avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
|
||||
);
|
||||
|
||||
// Prometheus Metrics
|
||||
|
|
|
@ -30,16 +30,10 @@ import { AmqpError } from '../error/AmqpError';
|
|||
import { AvroEncoderCtr } from './avro/AvroFactory';
|
||||
import { AmqpConnection } from './amqp/AmqpConnection';
|
||||
|
||||
import { AvroSchema, parse } from "avro-js";
|
||||
import { AvroSchema } from "avro-js";
|
||||
|
||||
export class DeltaPublisher implements AmqpPublisher
|
||||
{
|
||||
/** The avro schema */
|
||||
private _schema: AvroSchema;
|
||||
|
||||
/** The path to the avro schema */
|
||||
readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
|
||||
|
||||
/** A mapping of which delta type translated to which avro event */
|
||||
readonly DELTA_MAP: Record<string, string> = {
|
||||
data: 'STEP_SAVE',
|
||||
|
@ -58,11 +52,10 @@ export class DeltaPublisher implements AmqpPublisher
|
|||
constructor(
|
||||
private readonly _emitter: EventEmitter,
|
||||
private readonly _ts_ctr: () => UnixTimestamp,
|
||||
private readonly _encoder_ctr: AvroEncoderCtr,
|
||||
private readonly _encoder_ctor: AvroEncoderCtr,
|
||||
private readonly _conn: AmqpConnection,
|
||||
) {
|
||||
this._schema = parse( this.SCHEMA_PATH );
|
||||
}
|
||||
private readonly _schema: AvroSchema,
|
||||
) {}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -263,7 +256,7 @@ export class DeltaPublisher implements AmqpPublisher
|
|||
{
|
||||
this._assertValidAvro( this._schema, data )
|
||||
|
||||
const encoder = this._encoder_ctr( this._schema )
|
||||
const encoder = this._encoder_ctor( this._schema )
|
||||
|
||||
encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
|
||||
encoder.on('error', ( err: Error ) => { reject( err ); } )
|
||||
|
|
|
@ -28,10 +28,7 @@ import { EventEmitter } from "events";
|
|||
import { hasContext } from '../../src/error/ContextError';
|
||||
import { AmqpError } from '../../src/error/AmqpError';
|
||||
import { Channel } from 'amqplib';
|
||||
import {
|
||||
createAvroEncoder,
|
||||
AvroEncoderCtr,
|
||||
} from '../../src/system/avro/AvroFactory';
|
||||
import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
|
||||
|
||||
import { AvroSchema } from "avro-js";
|
||||
|
||||
|
@ -66,7 +63,20 @@ describe( 'server.DeltaPublisher', () =>
|
|||
};
|
||||
};
|
||||
|
||||
const sut = new Sut( emitter, ts_ctr, createAvroEncoder, conn );
|
||||
const stub_schema = <AvroSchema>(<unknown>{
|
||||
isValid()
|
||||
{
|
||||
// TODO: test me
|
||||
},
|
||||
} );
|
||||
|
||||
const sut = new Sut(
|
||||
emitter,
|
||||
ts_ctr,
|
||||
createMockEncoderCtor( stub_schema ),
|
||||
conn,
|
||||
stub_schema,
|
||||
);
|
||||
|
||||
return expect(
|
||||
sut.publish( <DocumentId>123, delta, bucket, ratedata )
|
||||
|
@ -118,8 +128,20 @@ describe( 'server.DeltaPublisher', () =>
|
|||
|
||||
conn.getAmqpChannel = getChannelF;
|
||||
|
||||
const result = new Sut( emitter, ts_ctr, createAvroEncoder, conn )
|
||||
.publish( doc_id, delta, bucket, ratedata );
|
||||
const stub_schema = <AvroSchema>(<unknown>{
|
||||
isValid()
|
||||
{
|
||||
// TODO: test me
|
||||
},
|
||||
} );
|
||||
|
||||
const result = new Sut(
|
||||
emitter,
|
||||
ts_ctr,
|
||||
createMockEncoderCtor( stub_schema ),
|
||||
conn,
|
||||
stub_schema,
|
||||
).publish( doc_id, delta, bucket, ratedata );
|
||||
|
||||
return Promise.all( [
|
||||
expect( result ).to.eventually.be.rejectedWith(
|
||||
|
@ -225,11 +247,20 @@ describe( 'server.DeltaPublisher', () =>
|
|||
const emitter = createMockEventEmitter();
|
||||
const conn = createMockAmqpConnection();
|
||||
const data = createMockData( delta_data );
|
||||
const sut = new Sut(
|
||||
|
||||
const stub_schema = <AvroSchema>(<unknown>{
|
||||
isValid()
|
||||
{
|
||||
// TODO: test me
|
||||
},
|
||||
} );
|
||||
|
||||
const sut = new Sut(
|
||||
emitter,
|
||||
ts_ctr,
|
||||
createAvroEncoder,
|
||||
createMockEncoderCtor( stub_schema ),
|
||||
conn,
|
||||
stub_schema
|
||||
);
|
||||
|
||||
sut.avroEncode( data )
|
||||
|
@ -384,11 +415,13 @@ describe( 'server.DeltaPublisher', () =>
|
|||
const emitter = createMockEventEmitter();
|
||||
const conn = createMockAmqpConnection();
|
||||
const avroEncoderCtr = createMockEncoder( encoded );
|
||||
const stub_schema = <AvroSchema>{};
|
||||
const sut = new Sut(
|
||||
emitter,
|
||||
ts_ctr,
|
||||
avroEncoderCtr,
|
||||
conn,
|
||||
stub_schema,
|
||||
);
|
||||
const actual = sut.setDataTypes( delta_data );
|
||||
|
||||
|
@ -483,3 +516,28 @@ function createMockDelta(): Delta<any>
|
|||
data: <DeltaResult<any>>{},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function createMockEncoderCtor( stub_schema: AvroSchema ):
|
||||
( schema: AvroSchema ) => Duplex
|
||||
{
|
||||
const events = <Record<string, () => void>>{};
|
||||
|
||||
const mock_duplex = <Duplex>(<unknown>{
|
||||
on( event_name: string, callback: () => void )
|
||||
{
|
||||
events[ event_name ] = callback;
|
||||
},
|
||||
|
||||
end()
|
||||
{
|
||||
events.end();
|
||||
},
|
||||
} );
|
||||
|
||||
return ( schema: AvroSchema ): Duplex =>
|
||||
{
|
||||
expect( schema ).to.equal( stub_schema );
|
||||
return mock_duplex;
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue