1
0
Fork 0

[DEV-5312] Update the avro schema to accept arbitrary array depths

master
Austin Schaffer 2019-12-17 13:20:20 -05:00
commit c64ecbc816
12 changed files with 606 additions and 152 deletions

View File

@ -272,8 +272,9 @@ export class MongoTokenDao implements TokenDao
`Unknown token namespace '${ns}' for document '${doc_id}`
),
{
doc_id: doc_id,
ns: ns,
doc_id: doc_id,
quote_id: doc_id,
ns: ns,
}
) );
@ -317,8 +318,9 @@ export class MongoTokenDao implements TokenDao
`on document '${doc_id}'`
),
{
doc_id: doc_id,
ns: ns,
doc_id: doc_id,
quote_id: doc_id,
ns: ns,
},
);
}
@ -363,6 +365,7 @@ export class MongoTokenDao implements TokenDao
),
{
doc_id: doc_id,
quote_id: doc_id,
ns: ns,
token_id: token_id,
},

View File

@ -22,6 +22,7 @@
import { DeltaDao } from '../system/db/DeltaDao';
import { DocumentMeta } from '../document/Document';
import { AmqpPublisher } from './AmqpPublisher';
import { context } from '../error/ContextError';
import { EventEmitter } from 'events';
import {
DeltaType,
@ -122,11 +123,25 @@ export class DeltaProcessor
)
.then( _ =>
{
this._emitter.emit( 'document-processed', { doc_id: meta.id } );
this._emitter.emit(
'document-processed',
{
doc_id: meta.id,
quote_id: meta.id,
},
);
} )
.catch( ( e: Error ) =>
{
this._emitter.emit( 'error', e );
const context_error = context(
e,
{
doc_id: meta.id,
quote_id: meta.id,
},
);
this._emitter.emit( 'error', context_error );
return this._dao.setErrorFlag( meta.id );
} );
}

View File

@ -84,6 +84,7 @@ export class DeltaPublisher implements AmqpPublisher
new AmqpError( 'Error sending message: No channel' ),
{
doc_id: meta.id,
quote_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp,
},
@ -104,6 +105,7 @@ export class DeltaPublisher implements AmqpPublisher
new Error ( 'Delta publish failed' ),
{
doc_id: meta.id,
quote_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp,
}
@ -115,7 +117,9 @@ export class DeltaPublisher implements AmqpPublisher
this._emitter.emit(
'delta-publish',
{
delta: delta,
doc_id: meta.id,
quote_id: meta.id,
delta: delta,
exchange: this._conn.getExchangeName(),
}
);

View File

@ -24,7 +24,7 @@ import { PsrLogger, LogLevel } from './PsrLogger';
declare type StructuredLog = {
message: string;
timestamp: UnixTimestamp;
timestamp: string;
service: string;
env: string;
severity: string;
@ -181,9 +181,12 @@ export class StandardLogger implements PsrLogger
str = msg;
}
const ts = this._ts_ctr();
const tsFormatted = new Date( ts * 1000 ).toISOString()
const structured_log = <StructuredLog>{
message: str,
timestamp: this._ts_ctr(),
timestamp: tsFormatted,
service: 'quote-server',
env: this._env,
severity: LogLevel[level],

View File

@ -196,12 +196,12 @@ export class V1MessageWriter implements MessageWriter
/**
* Format the data for avro by add type specifications to the data
*
* @param data - the data to format
* @param top_level - whether we are at the top level of the recursion
* @param data - the data to format
* @param depth - recursion depth
*
* @return the formatted data
*/
setDataTypes( data: any, top_level: boolean = true ): any
setDataTypes( data: any, depth: number = 0 ): any
{
let data_formatted: any = {};
@ -210,7 +210,7 @@ export class V1MessageWriter implements MessageWriter
case 'object':
if ( data == null )
{
return null;
data_formatted = null;
}
else if ( Array.isArray( data ) )
{
@ -218,10 +218,10 @@ export class V1MessageWriter implements MessageWriter
data.forEach( ( datum ) =>
{
arr.push( this.setDataTypes( datum, false ) );
arr.push( this.setDataTypes( datum, depth + 1 ) );
} );
data_formatted = ( top_level )
data_formatted = ( depth < 1 )
? arr
: { 'array': arr };
}
@ -231,29 +231,34 @@ export class V1MessageWriter implements MessageWriter
Object.keys( data).forEach( ( key: string ) =>
{
const datum = this.setDataTypes( data[ key ], false );
const datum = this.setDataTypes( data[ key ], depth + 1 );
datum_formatted[ key ] = datum;
} );
data_formatted = ( top_level )
data_formatted = ( depth < 1 )
? datum_formatted
: { 'map': datum_formatted };
}
break;
case 'boolean':
return { 'boolean': data };
return { 'bucket': { 'map': { 'boolean': data } } };
case 'number':
return { 'double': data };
return { 'bucket': { 'map': { 'double': data } } };
case 'string':
return { 'string': data };
return { 'bucket': { 'map': { 'string': data } } };
case 'undefined':
return null;
return { 'bucket': { 'map': null } };
}
if ( depth > 1 )
{
return { 'bucket': { 'map': data_formatted } };
}
return data_formatted;

View File

@ -123,50 +123,16 @@
"type": "map",
"values": [
"null",
"boolean",
"double",
"string",
{
"type": "array",
"items": [
"null",
"boolean",
"double",
"string",
{
"type": "array",
"items": [
"null",
"boolean",
"double",
"string",
{
"type": "array",
"items": [
"null",
"boolean",
"double",
"string"
]
}
]
},
{
"type": "map",
"values": [
"null",
"boolean",
"double",
"string",
{
"type": "map",
"values": [
"null",
"boolean",
"double",
"string"
]
}
]
}
]
"items": "Data"
},
{
"type": "map",
"values": "Data"
}
]
}

View File

@ -133,8 +133,9 @@ export class MongoDeltaDao implements DeltaDao
reject( context(
new DaoError( 'Error advancing delta index: ' + e ),
{
doc_id: doc_id,
type: type,
doc_id: doc_id,
quote_id: doc_id,
type: type,
}
) );
return;
@ -177,6 +178,7 @@ export class MongoDeltaDao implements DeltaDao
),
{
doc_id: doc_id,
quote_id: doc_id,
last_update_ts: last_update_ts,
}
) );
@ -215,7 +217,8 @@ export class MongoDeltaDao implements DeltaDao
'Failed setting error flag: ' + e
),
{
doc_id: doc_id,
doc_id: doc_id,
quote_id: doc_id,
}
) );
return;

View File

@ -372,6 +372,7 @@ describe( 'server.token.TokenDao', () =>
`${ns}.tok123`,
{
doc_id: did,
quote_id: did,
ns: ns,
token_id: 'tok123',
},
@ -386,8 +387,9 @@ describe( 'server.token.TokenDao', () =>
null,
ns,
{
doc_id: did,
ns: ns,
doc_id: did,
quote_id: did,
ns: ns,
},
],
@ -440,8 +442,9 @@ describe( 'server.token.TokenDao', () =>
null,
ns,
{
doc_id: did,
ns: ns,
doc_id: did,
quote_id: did,
ns: ns,
},
],
@ -454,8 +457,9 @@ describe( 'server.token.TokenDao', () =>
null,
ns,
{
doc_id: did,
ns: ns,
doc_id: did,
quote_id: did,
ns: ns,
},
],
] ).forEach( ( [ label, tok_id, dbresult, expected, fmsg, fcontext ] ) =>

View File

@ -123,6 +123,7 @@ describe( 'server.DeltaPublisher', () =>
const expected = {
doc_id: meta.id,
quote_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp
}

View File

@ -26,7 +26,7 @@ import { expect } from 'chai';
import { PsrLogger } from '../../src/system/PsrLogger';
describe( 'system.EventLogger captures and logs events', () =>
describe( 'system.EventMediator captures and logs events', () =>
{
it( 'document-processed triggers log#notice', () =>
{

View File

@ -30,7 +30,7 @@ declare interface MockConsole extends Console {
getStr(): string,
}
describe( 'system.EventLogger captures and logs events', () =>
describe( 'system.StandardLogger captures and logs events', () =>
{
it( 'debug triggers console output level: info', () =>
{
@ -139,7 +139,7 @@ describe( 'system.EventLogger captures and logs events', () =>
const context = { bar: 'baz' };
const expected_output = {
message: 'Foo',
timestamp: 123123,
timestamp: '1970-01-02T10:12:03.000Z',
service: 'quote-server',
env: 'test',
severity: 'NOTICE',

View File

@ -27,7 +27,10 @@ import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
import { DocumentMeta, DocumentId } from '../../src/document/Document';
import { Duplex } from 'stream';
import { AvroSchema } from 'avro-js';
import {
AvroSchema,
parse as avro_parse,
} from 'avro-js';
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
@ -92,97 +95,297 @@ describe( 'system.V1MessageWriter', () =>
{
label: 'Null array',
valid: true,
delta_data: { foo: { 'array': [ null ] } },
delta_data: { foo: { 'array': [
{ 'bucket': { 'map': null } }
] } },
},
{
label: 'Boolean value',
valid: true,
delta_data: { foo: { 'array': [
{ 'boolean': true },
{ 'bucket': { 'map': { 'boolean': true } } },
] } },
},
{
label: 'Simple string',
valid: true,
delta_data: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'string': 'baz' } } },
] } },
},
{
label: 'Simple int',
valid: true,
delta_data: { foo: { 'array': [
{ 'double': 123 },
{ 'bucket': { 'map': { 'double': 123 } } },
] } },
},
{
label: 'Nested array',
valid: true,
delta_data: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
] },
{ 'bucket': { 'map': { 'array': [
{ 'bucket': { 'map': { 'string': 'bar' } } },
] } } },
] } },
},
{
label: 'Array with nulls',
valid: true,
delta_data: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
null,
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'string': 'baz' } } },
{ 'bucket': { 'map': null } },
] } },
},
{
label: 'Nested Array with mixed values',
valid: true,
delta_data: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'double': 123321 },
null,
] }
{ 'bucket': { 'map': { 'array': [
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'double': 123321 } } },
{ 'bucket': { 'map': null } },
] } } }
] } },
},
{
label: 'Non-array',
valid: false,
delta_data: { foo: 'bar' },
expected: {
invalid_data: 'bar',
invalid_paths: [
'delta',
'Data',
'bucket',
'foo',
]
}
},
{
label: 'Map objects',
valid: true,
delta_data: { 'foo': { 'array': [
{ 'map': {
'bar': { 'map': {
'baz': { 'double': 1572903485000 },
} }
} }
{ 'bucket': { 'map': { 'map': { 'bar':
{ 'bucket': { 'map': { 'map': { 'baz':
{ 'bucket': { 'map': { 'double': 1572903485000 } } }
} } } }
} } } }
] } },
}
].forEach( ( { label, delta_data, valid } ) =>
},
{
label: 'Arbitrary array/map depth',
valid: true,
delta_data: {
"a": { "array": [
{ "bucket": { "map": { "map": {
"b": { "bucket": { "map": { "array": [
{ "bucket": { "map": {
"string": "c"
} } },
{ "bucket": { "map": { "array": [
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "d"
} } },
{ "bucket": { "map": {
"map": {
"e": { "bucket": { "map": { "string": "f" } } },
"g": { "bucket": { "map": { "string": "h" } } },
"i": { "bucket": { "map": { "string": "j" } } },
"k": { "bucket": { "map": { "string": "l" } } },
"m": { "bucket": { "map": { "string": "n" } } }
}
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "o"
} } },
{ "bucket": { "map": {
"map": {
"p": { "bucket": { "map": {
"string": "q"
} } },
"r": { "bucket": { "map": {
"string": "s"
} } },
"t": { "bucket": { "map": {
"string": "u"
} } }
}
} } },
{ "bucket": { "map": { "array": [] } } },
{ "bucket": { "map": null } }
]
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "v"
} } },
{ "bucket": { "map": {
"map": {
"w": { "bucket": { "map": { "string": "x" } } },
"y": { "bucket": { "map": { "string": "z" } } }
}
} } },
{
"bucket": {
"map": {
"array": [
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "aa"
} } },
{ "bucket": { "map": {
"map": {
"ab": {
"bucket": {
"map": {
"string": "ac"
}
}
},
"ad": {
"bucket": {
"map": {
"string": "ae"
}
}
},
"af": {
"bucket": {
"map": {
"string": "ag"
}
}
},
"ah": {
"bucket": {
"map": {
"string": "ai"
}
}
}
}
} } },
{ "bucket": { "map": {
"array": []
} } },
{ "bucket": { "map": null } }
]
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "aj"
} } },
{ "bucket": { "map": {
"map": {
"ak": {
"bucket": { "map": {
"string": "al"
} }
},
"am": {
"bucket": { "map": {
"string": "an"
} }
},
"ao": {
"bucket": { "map": {
"string": "ap"
} }
}
}
} } },
{ "bucket": { "map": {
"array": []
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "q"
} } }
]
} } }
]
} } }
]
}
}
},
{ "bucket": { "map": {
"string": ""
} } }
]
} } }
]
} } },
{ "bucket": { "map": {
"string": ""
} } }
]
} } }
] } } },
{ "bucket": { "map": null } },
{ "bucket": { "map": { "boolean": false } } }
] } } }
} } } }
] }
},
},
].forEach( ( { label, delta_data, valid, expected } ) =>
{
it( label, () =>
{
const data = createMockData( delta_data );
const schema = createMockAvroSchema();
const sut = new Sut(
createMockEncoderCtor( schema ),
schema
const schema = avro_parse(
__dirname + '/../../src/system/avro/schema.avsc'
);
sut.avroEncode( data )
.then( b =>
{
expect( typeof(b) ).to.equal( 'object' );
expect( valid ).to.be.true;
} )
.catch( _ =>
{
expect( valid ).to.be.false;
} );
const sut = new Sut( createMockEncoderCtor( schema ), schema );
const result = sut.avroEncode( data );
if ( valid )
{
// return expect( result ).to.eventually.deep.equal(
// Buffer.from( '' )
// )
// .then( b =>
// {
// expect( typeof(b) ).to.equal( 'object' );
// } );
return result.catch( e =>
{
console.log( 'avroerror: ', e );
expect.fail();
} );
}
else
{
return Promise.all( [
expect( result ).to.eventually.be.rejected,
result.catch( e =>
{
if ( !hasContext( e ) )
{
return expect.fail();
}
return expect( e.context )
.to.deep.equal( expected );
} )
] );
}
} );
} );
} );
@ -205,7 +408,7 @@ describe( 'system.V1MessageWriter', () =>
label: 'Boolean Value',
delta_data: { foo: [ true ] },
expected: { foo: { 'array': [
{ 'boolean': true },
{ 'bucket': { 'map': { 'boolean': true } } },
] } },
},
{
@ -215,8 +418,8 @@ describe( 'system.V1MessageWriter', () =>
'baz',
] },
expected: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'string': 'baz' } } },
] } },
},
{
@ -225,7 +428,7 @@ describe( 'system.V1MessageWriter', () =>
123
] },
expected: { foo: { 'array': [
{ 'double': 123 },
{ 'bucket': { 'map': { 'double': 123 } } },
] } },
},
{
@ -237,10 +440,10 @@ describe( 'system.V1MessageWriter', () =>
]
] },
expected: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
] },
{ 'bucket': { 'map': { 'array': [
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'string': 'baz' } } },
] } } },
] } },
},
{
@ -255,13 +458,13 @@ describe( 'system.V1MessageWriter', () =>
],
] },
expected: { foo: { 'array': [
{ 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'double': 123 },
null,
] },
] },
{ 'bucket': { 'map': { 'array': [
{ 'bucket': { 'map': { 'array': [
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'double': 123 } } },
{ 'bucket': { 'map': null } },
] } } },
] } } },
] } },
},
{
@ -272,13 +475,13 @@ describe( 'system.V1MessageWriter', () =>
null
] },
expected: { foo: { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
null
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'string': 'baz' } } },
{ 'bucket': { 'map': null } },
] } },
},
{
label: 'Nested Array with mixed values',
label: 'Nested array with mixed values',
delta_data: { foo: [
[
'bar',
@ -287,15 +490,15 @@ describe( 'system.V1MessageWriter', () =>
]
] },
expected: { foo: { 'array': [
{ 'array': [
{ 'string': 'bar' },
{ 'double': 123321 },
null,
] },
] } },
{ 'bucket': { 'map': { 'array': [
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'double': 123321 } } },
{ 'bucket': { 'map': null } },
] } } }
] } },
},
{
label: 'Nested Array with mixed values',
label: 'Nested map with mixed values',
delta_data: { foo: [
{
'bar': {
@ -307,16 +510,256 @@ describe( 'system.V1MessageWriter', () =>
},
] },
expected: { 'foo': { 'array': [
{ 'map': {
'bar': { 'map': {
'wer': { 'string': 'qaz' },
'qwe': { 'double': 1572903485000 },
'asd': { 'boolean': true },
'zxc': null,
} },
} },
{ 'bucket': { 'map': { 'map': { 'bar':
{ 'bucket': { 'map': { 'map': {
'wer': { 'bucket': { 'map': {
'string': 'qaz'
} } },
'qwe': { 'bucket': { 'map': {
'double': 1572903485000
} } },
'asd': { 'bucket': { 'map': {
'boolean': true
} } },
'zxc': { 'bucket': { 'map': null } }
} } } }
} } } }
] } },
},
{
label: 'Arbitrary array/map depth',
delta_data: {
"a": [
{
"b": [
"c",
[
[
"d",
{
"e": "f",
"g": "h",
"i": "j",
"k": "l",
"m": "n"
},
[
[
"o",
{
"p": "q",
"r": "s",
"t": "u"
},
[],
null
],
[
"v",
{
"w": "x",
"y": "z"
},
[
[
"aa",
{
"ab": "ac",
"ad": "ae",
"af": "ag",
"ah": "ai"
},
[],
null
],
[
"aj",
{
"ak": "al",
"am": "an",
"ao": "ap"
},
[],
[
"q"
]
]
],
""
]
],
""
]
],
null,
false
],
} ],
},
expected: {
"a": { "array": [
{ "bucket": { "map": { "map": {
"b": { "bucket": { "map": { "array": [
{ "bucket": { "map": {
"string": "c"
} } },
{ "bucket": { "map": { "array": [
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "d"
} } },
{ "bucket": { "map": {
"map": {
"e": { "bucket": { "map": { "string": "f" } } },
"g": { "bucket": { "map": { "string": "h" } } },
"i": { "bucket": { "map": { "string": "j" } } },
"k": { "bucket": { "map": { "string": "l" } } },
"m": { "bucket": { "map": { "string": "n" } } }
}
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "o"
} } },
{ "bucket": { "map": {
"map": {
"p": { "bucket": { "map": {
"string": "q"
} } },
"r": { "bucket": { "map": {
"string": "s"
} } },
"t": { "bucket": { "map": {
"string": "u"
} } }
}
} } },
{ "bucket": { "map": { "array": [] } } },
{ "bucket": { "map": null } }
]
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "v"
} } },
{ "bucket": { "map": {
"map": {
"w": { "bucket": { "map": { "string": "x" } } },
"y": { "bucket": { "map": { "string": "z" } } }
}
} } },
{
"bucket": {
"map": {
"array": [
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "aa"
} } },
{ "bucket": { "map": {
"map": {
"ab": {
"bucket": {
"map": {
"string": "ac"
}
}
},
"ad": {
"bucket": {
"map": {
"string": "ae"
}
}
},
"af": {
"bucket": {
"map": {
"string": "ag"
}
}
},
"ah": {
"bucket": {
"map": {
"string": "ai"
}
}
}
}
} } },
{ "bucket": { "map": {
"array": []
} } },
{ "bucket": { "map": null } }
]
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "aj"
} } },
{ "bucket": { "map": {
"map": {
"ak": {
"bucket": { "map": {
"string": "al"
} }
},
"am": {
"bucket": { "map": {
"string": "an"
} }
},
"ao": {
"bucket": { "map": {
"string": "ap"
} }
}
}
} } },
{ "bucket": { "map": {
"array": []
} } },
{ "bucket": { "map": {
"array": [
{ "bucket": { "map": {
"string": "q"
} } }
]
} } }
]
} } }
]
}
}
},
{ "bucket": { "map": {
"string": ""
} } }
]
} } }
]
} } },
{ "bucket": { "map": {
"string": ""
} } }
]
} } }
] } } },
{ "bucket": { "map": null } },
{ "bucket": { "map": { "boolean": false } } }
] } } }
} } } }
] }
},
},
].forEach( ( { label, delta_data, expected } ) =>
{
it( label, () =>
@ -328,6 +771,7 @@ describe( 'system.V1MessageWriter', () =>
avroEncoderCtr,
stub_schema,
);
const actual = sut.setDataTypes( delta_data );
expect( actual ).to.deep.equal( expected );
@ -388,8 +832,8 @@ describe( 'system.V1MessageWriter', () =>
Data: {
bucket: {
'foo': { 'array': [
{ 'string': 'bar' },
{ 'string': 'baz' },
{ 'bucket': { 'map': { 'string': 'bar' } } },
{ 'bucket': { 'map': { 'string': 'baz' } } },
] }
},
},
@ -460,10 +904,16 @@ function createMockData( delta_data: any ): any
},
document: {
id: 123123,
created: 1573856916,
modified: 1573856916,
created: { 'long': 157385691600 },
modified: { 'long': 257381491600 },
top_visited_step: '2',
},
session: {
Session: {
entity_name: 'Foo',
entity_id: 123,
},
},
data: null,
ratedata: null,
delta: {