diff --git a/.env b/.env new file mode 100644 index 0000000..983d285 --- /dev/null +++ b/.env @@ -0,0 +1,7 @@ +hostname=localhost +port=5672 +username= +password= +vhost=dev +exchange=quoteupdate + diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json index 4f79ad6..e924fb1 100644 --- a/conf/vanilla-server.json +++ b/conf/vanilla-server.json @@ -61,18 +61,6 @@ "heartbeat": 0, "vhost": "/", "exchange": "postrate" - }, - "deltaPublish": { - "protocol": "amqp", - "hostname": "localhost", - "port": 5672, - "username": "", - "password": "", - "locale": "en_US", - "frameMax": 0, - "heartbeat": 0, - "vhost": "/", - "exchange": "quoteupdate" } }, "c1export": { diff --git a/package.json.in b/package.json.in index d81861d..0aea8e0 100644 --- a/package.json.in +++ b/package.json.in @@ -39,7 +39,8 @@ "@types/mocha": "5.2.0", "sinon": ">=1.17.4", "es6-promise": "~3", - "@types/amqplib": "0.5.13" + "@types/amqplib": "0.5.13", + "avro-js": "1.9.1" }, "licenses": [ diff --git a/src/quote/BaseQuote.d.ts b/src/quote/BaseQuote.d.ts index 8273ad1..eb2204a 100644 --- a/src/quote/BaseQuote.d.ts +++ b/src/quote/BaseQuote.d.ts @@ -34,7 +34,7 @@ export declare class BaseQuote implements Quote * * @return quote program */ - getProgram(): Program + getProgram(): Program; /** @@ -42,7 +42,7 @@ export declare class BaseQuote implements Quote * * @return program id */ - getProgramId(): string + getProgramId(): string; /** @@ -54,7 +54,7 @@ export declare class BaseQuote implements Quote * * @return quote id */ - getId(): QuoteId + getId(): QuoteId; /** @@ -62,7 +62,7 @@ export declare class BaseQuote implements Quote * * @return id of current step */ - getCurrentStepId(): number + getCurrentStepId(): number; /** @@ -73,7 +73,7 @@ export declare class BaseQuote implements Quote * * @return self */ - setExplicitLock( reason: string, step: number ): this + setExplicitLock( reason: string, step: number ): this; /** @@ -83,7 +83,7 @@ export declare class BaseQuote implements Quote * * @return self */ - setLastPremiumDate( timestamp: UnixTimestamp ): this + setLastPremiumDate( timestamp: UnixTimestamp ): this; /** @@ -91,7 +91,7 @@ export declare class BaseQuote implements Quote * * @return last calculated time or 0 */ - getLastPremiumDate(): UnixTimestamp + getLastPremiumDate(): UnixTimestamp; /** @@ -99,7 +99,7 @@ export declare class BaseQuote implements Quote * * @return the data bucket */ - getBucket(): QuoteDataBucket + getBucket(): QuoteDataBucket; /** @@ -107,7 +107,7 @@ export declare class BaseQuote implements Quote * * @return lock reason */ - getExplicitLockReason(): string + getExplicitLockReason(): string; /** @@ -117,7 +117,7 @@ export declare class BaseQuote implements Quote * * @return {number} locked max step or 0 if not applicable */ - getExplicitLockStep(): PositiveInteger + getExplicitLockStep(): PositiveInteger; /** @@ -125,7 +125,7 @@ export declare class BaseQuote implements Quote * * @return true if imported, otherwise false */ - isImported(): boolean + isImported(): boolean; /** @@ -133,7 +133,7 @@ export declare class BaseQuote implements Quote * * @return true if bound, otherwise false */ - isBound(): boolean + isBound(): boolean; /** @@ -141,7 +141,7 @@ export declare class BaseQuote implements Quote * * @return top visited step id */ - getTopVisitedStepId(): PositiveInteger + getTopVisitedStepId(): PositiveInteger; /** @@ -149,5 +149,5 @@ export declare class BaseQuote implements Quote * * @return top saved step id */ - getTopSavedStepId(): PositiveInteger + getTopSavedStepId(): PositiveInteger; } diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 67e372d..678e700 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -168,4 +168,26 @@ export class DeltaProcessor return 0; } + + + /** + * Generate amqp config from environment variables + * + * @returns the amqp configuration + */ + // generateConfigFromEnv(): AmqpConfig + // { + // return { + // "protocol": "amqp", + // "hostname": process.env.hostname, + // "port": process.env.port, + // "username": process.env.username, + // "password": process.env.password, + // "locale": "en_US", + // "frameMax": 0, + // "heartbeat": 0, + // "vhost": process.env.vhost, + // "exchange": process.env.exchange, + // }; + // } } \ No newline at end of file diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index 2606c56..57a74b6 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -29,6 +29,8 @@ import { Channel } from 'amqplib'; +const avro = require( 'avro-js' ); + export interface AmqpConfig extends Options.Connect { /** The name of a queue or exchange to publish to */ @@ -38,6 +40,9 @@ export interface AmqpConfig extends Options.Connect { export class DeltaPublisher implements AmqpPublisher { + /** The path to the avro schema */ + readonly SCHEMA_PATH = './avro/schema.avsc'; + /** A mapping of which delta type translated to which avro event */ readonly DELTA_MAP: Record = { data: 'rate', @@ -48,8 +53,8 @@ export class DeltaPublisher implements AmqpPublisher /** * Initialize trait * - * @param {Object} conf AMQP configuration - * @param {DebugLog} logger logger instance + * @param _conf - amqp configuration + * @param _logger - logger instance */ constructor( private readonly _conf: AmqpConfig, @@ -115,10 +120,12 @@ export class DeltaPublisher implements AmqpPublisher const event_id = this.DELTA_MAP[ delta.type ]; - const data = new Buffer( JSON.stringify( { + const data = { delta: delta, event: event_id, - } ) ); + }; + + const avro_buffer = this._avroEncode( data ); // we don't use a routing key; fanout exchange const routing_key = ''; @@ -126,8 +133,25 @@ export class DeltaPublisher implements AmqpPublisher return channel.publish( exchange, routing_key, - data, + avro_buffer, { headers: headers }, ); } + + + /** + * Encode the data in an avro buffer + * + * @param data - the data to encode + * + * @return the avro buffer + */ + _avroEncode( data: Record ): Buffer + { + const type = avro.parse( this.SCHEMA_PATH ); + + const buffer = type.toBuffer( data ); + + return buffer; + } } diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc new file mode 100644 index 0000000..ee793a6 --- /dev/null +++ b/src/system/avro/schema.avsc @@ -0,0 +1,161 @@ +{ + "type": "record", + "name": "update", + "fields": [ + { + "name": "event", + "type": { + "type": "record", + "name": "Event", + "fields": [ + { + "name": "id", + "type": { + "name": "EventId", + "type": "enum", + "symbols": [ + "STEP_SAVE", + "RATE" + ] + } + }, + { + "name": "ts", + "type": "long", + "logicalType": "timestamp-millis" + }, + { + "name": "actor", + "type": { + "type": "enum", + "name": "EventActor", + "symbols": [ "USER", "CLIENT", "SERVER" ] + } + }, + { + "name": "step", + "type": { + "type": "record", + "name": "EventStep", + "fields": [ + { + "name": "transition", + "type": { + "type": "enum", + "name": "EventStepTransition", + "symbols": [ "BACK", "FORWARD", "END" ] + } + }, + { + "name": "src", + "type": "string" + }, + { + "name": "dest", + "type": "string" + } + ] + } + } + ] + } + }, + { + "name": "document", + "type": { + "type": "record", + "name": "Document", + "doc": "Source document (quote)", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "created", + "type": "long", + "logicalType": "timestamp-millis" + }, + { + "name": "modified", + "type": "long", + "logicalType": "timestamp-millis" + }, + { + "name": "top_visited_step", + "type": "string" + } + ] + } + }, + { + "name": "session", + "type": { + "type": "record", + "name": "Session", + "fields": [ + { + "name": "entity_name", + "type": "string" + }, + { + "name": "entity_id", + "type": "int" + } + ] + } + }, + { + "name": "data", + "type": [ + "null", + { + "type": "record", + "name": "Data", + "fields": [ + { + "name": "bucket", + "type": { + "type": "map", + "values": { + "type" : "array", + "items" : "string" + } + } + } + ] + } + ] + }, + { + "name": "delta", + "type": [ + "null", + "Data" + ] + }, + { + "name": "program", + "type": [ + "null", + { + "type": "record", + "name": "Program", + "fields": [ + { + "type": "string", + "name": "id", + "doc": "Program id" + }, + { + "type": "string", + "name": "version", + "doc": "Program version" + } + ] + } + ] + } + ] +} +