diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index d68615f..0e5201a 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -31,9 +31,18 @@ import { ReverseDelta, } from "../bucket/delta"; +/** Deltas and state of data prior to their application */ +type DeltaState = [ + Delta, + Record, + Record, +]; + /** * Process deltas for a quote and publish to a queue + * + * TODO: Decouple from applyDelta */ export class DeltaProcessor { @@ -87,10 +96,12 @@ export class DeltaProcessor const deltas = this._getTimestampSortedDeltas( doc ); const doc_id = doc.id; const bucket = doc.data; - const ratedata = doc.ratedata; + const ratedata = doc.ratedata || {}; const last_updated_ts = doc.lastUpdate; - return this._processNextDelta( doc_id, deltas, bucket, ratedata ) + const history = this._applyDeltas( deltas, bucket, ratedata ); + + return this._processNextDelta( doc_id, history ) .then( _ => this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) ) @@ -106,41 +117,90 @@ export class DeltaProcessor } + /** + * Produce states of buckets at each point in history + * + * For bucket data, each tuple will contain the state of the bucket + * prior to the corresponding delta having been applied. For rate data, + * the tuple will also contain the state of the bucket at the point of + * rating. + * + * @param deltas - deltas to apply + * @param bucket - current state of bucket prior to deltas + * @param ratedata - current state of ratedata prior to deltas + * + * @return deltas paired with state prior to its application + */ + private _applyDeltas( + deltas: Delta[], + bucket: Record, + ratedata: Record, + ): DeltaState[] + { + const pairs: DeltaState[] = []; + + let bucket_state = bucket; + let ratedata_state = ratedata; + let i = deltas.length; + + while ( i-- ) + { + let delta = deltas[ i ]; + + pairs[ i ] = [ + delta, + bucket_state, + ( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {}, + ]; + + // Don't apply the final delta, since we won't use it + if ( i === 0 ) + { + break; + } + + if ( delta.type === this.DELTA_DATA ) + { + bucket_state = applyDelta( + Object.create( bucket_state ), + deltas[ i ].data, + ); + } + else + { + ratedata_state = applyDelta( + Object.create( ratedata_state ), + deltas[ i ].data, + ); + } + } + + return pairs; + } + + private _processNextDelta( - doc_id: DocumentId, - deltas: Delta[], - bucket: Record, - ratedata?: Record, + doc_id: DocumentId, + history: DeltaState[], ): Promise { - const delta = deltas.shift(); - - if ( !delta ) + if ( history.length === 0 ) { return Promise.resolve(); } + const [ delta, bucket, ratedata ] = history[ 0 ]; + const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type; this._emitter.emit( 'delta-process-start', delta_uid ); - if ( delta.type == this.DELTA_DATA ) - { - bucket = applyDelta( bucket, delta.data ); - } - else - { - ratedata = applyDelta( ratedata, delta.data ); - } - return this._publisher.publish( doc_id, delta, bucket, ratedata ) .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) ) .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) .then( _ => this._processNextDelta( doc_id, - deltas, - bucket, - ratedata + history.slice( 1 ), ) ); } diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index 0b7c35d..2cd68e2 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () => expected: any }[]>[ { - label: 'No deltas are processed', + label: "No deltas are processed", given: [ { id: 123, @@ -52,168 +52,313 @@ describe( 'system.DeltaProcessor', () => ], expected: [], }, + + // when quote is initialized: { foo: [ "" ], state: [ "a" ] } { - label: 'Publishes deltas in order', + label: "Publishes deltas in order", + given: [ { id: 123, lastUpdate: 123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, + + data: { + foo: [ "third" ], + state: [ "a", "b", "c", "d" ], + }, + + ratedata: { + prem: [ "rate_second" ], + state: [ "i", "ii", "iii" ], + }, + rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, - timestamp: 123123, + timestamp: 1, + data: { + foo: [ "" ], + state: [ undefined, null ], + }, }, { - data: { foo: [ 'second_bar' ] }, - timestamp: 234123, + timestamp: 3, + data: { + foo: [ "first" ], + state: [ undefined, undefined, null ], + }, + }, + { + timestamp: 5, + data: { + foo: [ "second" ], + state: [ undefined, undefined, undefined, null ], + }, + }, + ], + + ratedata: [ + { + timestamp: 2, + data: { + prem: [ "" ], + state: [ undefined, null ], + }, + }, + { + timestamp: 4, + data: { + prem: [ "rate_first" ], + state: [ undefined, undefined, null ], + }, }, ], }, }, ], + expected: [ + // bucket { doc_id: 123, - delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, + rdelta: { + foo: [ "" ], + state: [ undefined, null ], + }, + bucket: { + foo: [ "first" ], + state: [ "a", "b" ], + }, ratedata: {}, }, + + // rate { doc_id: 123, - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, + rdelta: { + prem: [ "" ], + state: [ undefined, null ], + }, + bucket: { + foo: [ "first" ], + state: [ "a", "b" ], + }, + ratedata: { + prem: [ "rate_first" ], + state: [ "i", "ii" ], + }, + }, + + // bucket + { + doc_id: 123, + rdelta: { + foo: [ "first" ], + state: [ undefined, undefined, null ], + }, + bucket: { + foo: [ "second" ], + state: [ "a", "b", "c" ], + }, + ratedata: {}, + }, + + // rate + { + doc_id: 123, + rdelta: { + prem: [ "rate_first" ], + state: [ undefined, undefined, null ], + }, + bucket: { + foo: [ "second" ], + state: [ "a", "b", "c" ], + }, + ratedata: { + prem: [ "rate_second" ], + state: [ "i", "ii", "iii" ], + }, + }, + + // bucket + { + doc_id: 123, + rdelta: { + foo: [ "second" ], + state: [ undefined, undefined, undefined, null ], + }, + bucket: { + foo: [ "third" ], + state: [ "a", "b", "c", "d" ], + }, ratedata: {}, }, ], }, + { - label: 'Publishes deltas in order for multiple documents', + label: "Publishes deltas in order for multiple documents", + given: [ { id: 123, lastUpdate: 123123123, - data: { foo: [ 'start_bar_123' ] }, - ratedata: {}, + + data: { + foo: [ "first" ], + state: [ "a", "b" ], + }, + + ratedata: { + prem: [ "rate_first" ], + state: [ "i", "ii" ], + }, + rdelta: { data: [ { - data: { foo: [ 'second_bar_123' ] }, - timestamp: 234, + timestamp: 1, + data: { + foo: [ "" ], + state: [ undefined, null ], + }, }, ], + ratedata: [ { - data: { foo: [ 'first_bar_123' ] }, - timestamp: 123, + timestamp: 4, + data: { + prem: [ "" ], + state: [ undefined, null ], + }, }, ], }, }, + + // timestamps of this document are sandwiched between + // the above to make sure documents are processed + // independently (without splicing their deltas together) { id: 234, - lastUpdate: 123123123, - data: { foo: [ 'start_bar_234' ] }, - ratedata: {}, + lastUpdate: 121212123, + + data: { + foo2: [ "first" ], + state: [ "a", "b" ], + }, + + ratedata: { + prem2: [ "rate_first" ], + state: [ "i", "ii" ], + }, + rdelta: { data: [ { - data: { foo: [ 'first_bar_234' ] }, - timestamp: 123, - }, - { - data: { foo: [ 'second_bar_234' ] }, - timestamp: 234, - }, - { - data: { foo: [ 'third_bar_234' ] }, - timestamp: 345, + timestamp: 2, + data: { + foo2: [ "" ], + state: [ undefined, null ], + }, }, ], - }, - }, - { - id: 345, - lastUpdate: 123123123, - data: { foo: [ 'start_bar_345' ] }, - ratedata: {}, - rdelta: { + ratedata: [ { - data: { foo: [ 'first_bar_345' ] }, - timestamp: 123, - }, - { - data: { foo: [ 'second_bar_345' ] }, - timestamp: 234, + timestamp: 3, + data: { + prem2: [ "" ], + state: [ undefined, null ], + }, }, ], }, }, ], + expected: [ + // bucket { doc_id: 123, - delta: { foo: [ 'first_bar_123' ] }, - bucket: { foo: [ 'start_bar_123' ] }, - ratedata: { foo: [ 'first_bar_123' ] }, + rdelta: { + foo: [ "" ], + state: [ undefined, null ], + }, + bucket: { + foo: [ "first" ], + state: [ "a", "b" ], + }, + ratedata: {}, }, + + // rate { doc_id: 123, - delta: { foo: [ 'second_bar_123' ] }, - bucket: { foo: [ 'second_bar_123' ] }, - ratedata: { foo: [ 'first_bar_123' ] }, + rdelta: { + prem: [ "" ], + state: [ undefined, null ], + }, + bucket: { + foo: [ "first" ], + state: [ "a", "b" ], + }, + ratedata: { + prem: [ "rate_first" ], + state: [ "i", "ii" ], + }, }, + + // bucket { doc_id: 234, - delta: { foo: [ 'first_bar_234' ] }, - bucket: { foo: [ 'first_bar_234' ] }, + rdelta: { + foo2: [ "" ], + state: [ undefined, null ], + }, + bucket: { + foo2: [ "first" ], + state: [ "a", "b" ], + }, ratedata: {}, }, + + // rate { doc_id: 234, - delta: { foo: [ 'second_bar_234' ] }, - bucket: { foo: [ 'second_bar_234' ] }, - ratedata: {}, - }, - { - doc_id: 234, - delta: { foo: [ 'third_bar_234' ] }, - bucket: { foo: [ 'third_bar_234' ] }, - ratedata: {}, - }, - { - doc_id: 345, - delta: { foo: [ 'first_bar_345' ] }, - bucket: { foo: [ 'start_bar_345' ] }, - ratedata: { foo: [ 'first_bar_345' ] }, - }, - { - doc_id: 345, - delta: { foo: [ 'second_bar_345' ] }, - bucket: { foo: [ 'start_bar_345' ] }, - ratedata: { foo: [ 'second_bar_345' ] }, + rdelta: { + prem2: [ "" ], + state: [ undefined, null ], + }, + bucket: { + foo2: [ "first" ], + state: [ "a", "b" ], + }, + ratedata: { + prem2: [ "rate_first" ], + state: [ "i", "ii" ], + }, }, ], }, + { - label: 'trims delta array based on index', + label: "trims delta array based on index", given: [ { id: 111, lastUpdate: 123123123, - data: { foo: [ 'bar' ] }, + data: { foo: [ "second" ] }, ratedata: {}, rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, + data: { foo: [ "" ] }, timestamp: 123, }, { - data: { foo: [ 'second_bar' ] }, + data: { foo: [ "first" ] }, timestamp: 234, }, ], @@ -226,8 +371,8 @@ describe( 'system.DeltaProcessor', () => expected: [ { doc_id: 111, - delta: { foo: [ 'second_bar' ] }, - bucket: { foo: [ 'second_bar' ] }, + rdelta: { foo: [ "first" ] }, + bucket: { foo: [ "second" ] }, ratedata: {} }, ], @@ -253,7 +398,7 @@ describe( 'system.DeltaProcessor', () => { published.push( { doc_id: doc_id, - delta: delta.data, + rdelta: delta.data, bucket: bucket, ratedata: ratedata, } ); @@ -314,13 +459,13 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'start_bar' ] }, ratedata: {}, }, { doc_id: 234, delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'start_bar' ] }, ratedata: {}, } ]; @@ -416,7 +561,7 @@ describe( 'system.DeltaProcessor', () => const expected_published = [ { doc_id: 123, delta: { foo: [ 'first_bar' ] }, - bucket: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'start_bar' ] }, ratedata: {}, } ];