DeltaProcessor: Correct delta application
Deltas must be applied in reverse to reproduce the state of the bucket or rate data at that point in time, which is then paired with the delta that will transform it into the previous state.master
parent
819701eca3
commit
9eb1f3afca
|
@ -31,9 +31,18 @@ import {
|
|||
ReverseDelta,
|
||||
} from "../bucket/delta";
|
||||
|
||||
/** Deltas and state of data prior to their application */
|
||||
type DeltaState = [
|
||||
Delta<any>,
|
||||
Record<string, any>,
|
||||
Record<string, any>,
|
||||
];
|
||||
|
||||
|
||||
/**
|
||||
* Process deltas for a quote and publish to a queue
|
||||
*
|
||||
* TODO: Decouple from applyDelta
|
||||
*/
|
||||
export class DeltaProcessor
|
||||
{
|
||||
|
@ -87,10 +96,12 @@ export class DeltaProcessor
|
|||
const deltas = this._getTimestampSortedDeltas( doc );
|
||||
const doc_id = doc.id;
|
||||
const bucket = doc.data;
|
||||
const ratedata = doc.ratedata;
|
||||
const ratedata = doc.ratedata || {};
|
||||
const last_updated_ts = doc.lastUpdate;
|
||||
|
||||
return this._processNextDelta( doc_id, deltas, bucket, ratedata )
|
||||
const history = this._applyDeltas( deltas, bucket, ratedata );
|
||||
|
||||
return this._processNextDelta( doc_id, history )
|
||||
.then( _ =>
|
||||
this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
|
||||
)
|
||||
|
@ -106,41 +117,90 @@ export class DeltaProcessor
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Produce states of buckets at each point in history
|
||||
*
|
||||
* For bucket data, each tuple will contain the state of the bucket
|
||||
* prior to the corresponding delta having been applied. For rate data,
|
||||
* the tuple will also contain the state of the bucket at the point of
|
||||
* rating.
|
||||
*
|
||||
* @param deltas - deltas to apply
|
||||
* @param bucket - current state of bucket prior to deltas
|
||||
* @param ratedata - current state of ratedata prior to deltas
|
||||
*
|
||||
* @return deltas paired with state prior to its application
|
||||
*/
|
||||
private _applyDeltas(
|
||||
deltas: Delta<any>[],
|
||||
bucket: Record<string, any>,
|
||||
ratedata: Record<string, any>,
|
||||
): DeltaState[]
|
||||
{
|
||||
const pairs: DeltaState[] = [];
|
||||
|
||||
let bucket_state = bucket;
|
||||
let ratedata_state = ratedata;
|
||||
let i = deltas.length;
|
||||
|
||||
while ( i-- )
|
||||
{
|
||||
let delta = deltas[ i ];
|
||||
|
||||
pairs[ i ] = [
|
||||
delta,
|
||||
bucket_state,
|
||||
( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {},
|
||||
];
|
||||
|
||||
// Don't apply the final delta, since we won't use it
|
||||
if ( i === 0 )
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if ( delta.type === this.DELTA_DATA )
|
||||
{
|
||||
bucket_state = applyDelta(
|
||||
Object.create( bucket_state ),
|
||||
deltas[ i ].data,
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
ratedata_state = applyDelta(
|
||||
Object.create( ratedata_state ),
|
||||
deltas[ i ].data,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return pairs;
|
||||
}
|
||||
|
||||
|
||||
private _processNextDelta(
|
||||
doc_id: DocumentId,
|
||||
deltas: Delta<any>[],
|
||||
bucket: Record<string, any>,
|
||||
ratedata?: Record<string, any>,
|
||||
doc_id: DocumentId,
|
||||
history: DeltaState[],
|
||||
): Promise<void>
|
||||
{
|
||||
const delta = deltas.shift();
|
||||
|
||||
if ( !delta )
|
||||
if ( history.length === 0 )
|
||||
{
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const [ delta, bucket, ratedata ] = history[ 0 ];
|
||||
|
||||
const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type;
|
||||
|
||||
this._emitter.emit( 'delta-process-start', delta_uid );
|
||||
|
||||
if ( delta.type == this.DELTA_DATA )
|
||||
{
|
||||
bucket = applyDelta( bucket, delta.data );
|
||||
}
|
||||
else
|
||||
{
|
||||
ratedata = applyDelta( ratedata, delta.data );
|
||||
}
|
||||
|
||||
return this._publisher.publish( doc_id, delta, bucket, ratedata )
|
||||
.then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) )
|
||||
.then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
|
||||
.then( _ => this._processNextDelta(
|
||||
doc_id,
|
||||
deltas,
|
||||
bucket,
|
||||
ratedata
|
||||
history.slice( 1 ),
|
||||
) );
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () =>
|
|||
expected: any
|
||||
}[]>[
|
||||
{
|
||||
label: 'No deltas are processed',
|
||||
label: "No deltas are processed",
|
||||
given: [
|
||||
{
|
||||
id: 123,
|
||||
|
@ -52,168 +52,313 @@ describe( 'system.DeltaProcessor', () =>
|
|||
],
|
||||
expected: [],
|
||||
},
|
||||
|
||||
// when quote is initialized: { foo: [ "" ], state: [ "a" ] }
|
||||
{
|
||||
label: 'Publishes deltas in order',
|
||||
label: "Publishes deltas in order",
|
||||
|
||||
given: [
|
||||
{
|
||||
id: 123,
|
||||
lastUpdate: 123123123,
|
||||
data: { foo: [ 'start_bar' ] },
|
||||
ratedata: {},
|
||||
|
||||
data: {
|
||||
foo: [ "third" ],
|
||||
state: [ "a", "b", "c", "d" ],
|
||||
},
|
||||
|
||||
ratedata: {
|
||||
prem: [ "rate_second" ],
|
||||
state: [ "i", "ii", "iii" ],
|
||||
},
|
||||
|
||||
rdelta: {
|
||||
data: [
|
||||
{
|
||||
data: { foo: [ 'first_bar' ] },
|
||||
timestamp: 123123,
|
||||
timestamp: 1,
|
||||
data: {
|
||||
foo: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
},
|
||||
{
|
||||
data: { foo: [ 'second_bar' ] },
|
||||
timestamp: 234123,
|
||||
timestamp: 3,
|
||||
data: {
|
||||
foo: [ "first" ],
|
||||
state: [ undefined, undefined, null ],
|
||||
},
|
||||
},
|
||||
{
|
||||
timestamp: 5,
|
||||
data: {
|
||||
foo: [ "second" ],
|
||||
state: [ undefined, undefined, undefined, null ],
|
||||
},
|
||||
},
|
||||
],
|
||||
|
||||
ratedata: [
|
||||
{
|
||||
timestamp: 2,
|
||||
data: {
|
||||
prem: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
},
|
||||
{
|
||||
timestamp: 4,
|
||||
data: {
|
||||
prem: [ "rate_first" ],
|
||||
state: [ undefined, undefined, null ],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
|
||||
expected: [
|
||||
// bucket
|
||||
{
|
||||
doc_id: 123,
|
||||
delta: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'first_bar' ] },
|
||||
rdelta: {
|
||||
foo: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
ratedata: {},
|
||||
},
|
||||
|
||||
// rate
|
||||
{
|
||||
doc_id: 123,
|
||||
delta: { foo: [ 'second_bar' ] },
|
||||
bucket: { foo: [ 'second_bar' ] },
|
||||
rdelta: {
|
||||
prem: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
ratedata: {
|
||||
prem: [ "rate_first" ],
|
||||
state: [ "i", "ii" ],
|
||||
},
|
||||
},
|
||||
|
||||
// bucket
|
||||
{
|
||||
doc_id: 123,
|
||||
rdelta: {
|
||||
foo: [ "first" ],
|
||||
state: [ undefined, undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "second" ],
|
||||
state: [ "a", "b", "c" ],
|
||||
},
|
||||
ratedata: {},
|
||||
},
|
||||
|
||||
// rate
|
||||
{
|
||||
doc_id: 123,
|
||||
rdelta: {
|
||||
prem: [ "rate_first" ],
|
||||
state: [ undefined, undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "second" ],
|
||||
state: [ "a", "b", "c" ],
|
||||
},
|
||||
ratedata: {
|
||||
prem: [ "rate_second" ],
|
||||
state: [ "i", "ii", "iii" ],
|
||||
},
|
||||
},
|
||||
|
||||
// bucket
|
||||
{
|
||||
doc_id: 123,
|
||||
rdelta: {
|
||||
foo: [ "second" ],
|
||||
state: [ undefined, undefined, undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "third" ],
|
||||
state: [ "a", "b", "c", "d" ],
|
||||
},
|
||||
ratedata: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
|
||||
{
|
||||
label: 'Publishes deltas in order for multiple documents',
|
||||
label: "Publishes deltas in order for multiple documents",
|
||||
|
||||
given: [
|
||||
{
|
||||
id: 123,
|
||||
lastUpdate: 123123123,
|
||||
data: { foo: [ 'start_bar_123' ] },
|
||||
ratedata: {},
|
||||
|
||||
data: {
|
||||
foo: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
|
||||
ratedata: {
|
||||
prem: [ "rate_first" ],
|
||||
state: [ "i", "ii" ],
|
||||
},
|
||||
|
||||
rdelta: {
|
||||
data: [
|
||||
{
|
||||
data: { foo: [ 'second_bar_123' ] },
|
||||
timestamp: 234,
|
||||
timestamp: 1,
|
||||
data: {
|
||||
foo: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
},
|
||||
],
|
||||
|
||||
ratedata: [
|
||||
{
|
||||
data: { foo: [ 'first_bar_123' ] },
|
||||
timestamp: 123,
|
||||
timestamp: 4,
|
||||
data: {
|
||||
prem: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
|
||||
// timestamps of this document are sandwiched between
|
||||
// the above to make sure documents are processed
|
||||
// independently (without splicing their deltas together)
|
||||
{
|
||||
id: 234,
|
||||
lastUpdate: 123123123,
|
||||
data: { foo: [ 'start_bar_234' ] },
|
||||
ratedata: {},
|
||||
lastUpdate: 121212123,
|
||||
|
||||
data: {
|
||||
foo2: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
|
||||
ratedata: {
|
||||
prem2: [ "rate_first" ],
|
||||
state: [ "i", "ii" ],
|
||||
},
|
||||
|
||||
rdelta: {
|
||||
data: [
|
||||
{
|
||||
data: { foo: [ 'first_bar_234' ] },
|
||||
timestamp: 123,
|
||||
},
|
||||
{
|
||||
data: { foo: [ 'second_bar_234' ] },
|
||||
timestamp: 234,
|
||||
},
|
||||
{
|
||||
data: { foo: [ 'third_bar_234' ] },
|
||||
timestamp: 345,
|
||||
timestamp: 2,
|
||||
data: {
|
||||
foo2: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 345,
|
||||
lastUpdate: 123123123,
|
||||
data: { foo: [ 'start_bar_345' ] },
|
||||
ratedata: {},
|
||||
rdelta: {
|
||||
|
||||
ratedata: [
|
||||
{
|
||||
data: { foo: [ 'first_bar_345' ] },
|
||||
timestamp: 123,
|
||||
},
|
||||
{
|
||||
data: { foo: [ 'second_bar_345' ] },
|
||||
timestamp: 234,
|
||||
timestamp: 3,
|
||||
data: {
|
||||
prem2: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
|
||||
expected: [
|
||||
// bucket
|
||||
{
|
||||
doc_id: 123,
|
||||
delta: { foo: [ 'first_bar_123' ] },
|
||||
bucket: { foo: [ 'start_bar_123' ] },
|
||||
ratedata: { foo: [ 'first_bar_123' ] },
|
||||
rdelta: {
|
||||
foo: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
ratedata: {},
|
||||
},
|
||||
|
||||
// rate
|
||||
{
|
||||
doc_id: 123,
|
||||
delta: { foo: [ 'second_bar_123' ] },
|
||||
bucket: { foo: [ 'second_bar_123' ] },
|
||||
ratedata: { foo: [ 'first_bar_123' ] },
|
||||
rdelta: {
|
||||
prem: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
ratedata: {
|
||||
prem: [ "rate_first" ],
|
||||
state: [ "i", "ii" ],
|
||||
},
|
||||
},
|
||||
|
||||
// bucket
|
||||
{
|
||||
doc_id: 234,
|
||||
delta: { foo: [ 'first_bar_234' ] },
|
||||
bucket: { foo: [ 'first_bar_234' ] },
|
||||
rdelta: {
|
||||
foo2: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo2: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
ratedata: {},
|
||||
},
|
||||
|
||||
// rate
|
||||
{
|
||||
doc_id: 234,
|
||||
delta: { foo: [ 'second_bar_234' ] },
|
||||
bucket: { foo: [ 'second_bar_234' ] },
|
||||
ratedata: {},
|
||||
},
|
||||
{
|
||||
doc_id: 234,
|
||||
delta: { foo: [ 'third_bar_234' ] },
|
||||
bucket: { foo: [ 'third_bar_234' ] },
|
||||
ratedata: {},
|
||||
},
|
||||
{
|
||||
doc_id: 345,
|
||||
delta: { foo: [ 'first_bar_345' ] },
|
||||
bucket: { foo: [ 'start_bar_345' ] },
|
||||
ratedata: { foo: [ 'first_bar_345' ] },
|
||||
},
|
||||
{
|
||||
doc_id: 345,
|
||||
delta: { foo: [ 'second_bar_345' ] },
|
||||
bucket: { foo: [ 'start_bar_345' ] },
|
||||
ratedata: { foo: [ 'second_bar_345' ] },
|
||||
rdelta: {
|
||||
prem2: [ "" ],
|
||||
state: [ undefined, null ],
|
||||
},
|
||||
bucket: {
|
||||
foo2: [ "first" ],
|
||||
state: [ "a", "b" ],
|
||||
},
|
||||
ratedata: {
|
||||
prem2: [ "rate_first" ],
|
||||
state: [ "i", "ii" ],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
|
||||
{
|
||||
label: 'trims delta array based on index',
|
||||
label: "trims delta array based on index",
|
||||
given: [
|
||||
{
|
||||
id: 111,
|
||||
lastUpdate: 123123123,
|
||||
data: { foo: [ 'bar' ] },
|
||||
data: { foo: [ "second" ] },
|
||||
ratedata: {},
|
||||
rdelta: {
|
||||
data: [
|
||||
{
|
||||
data: { foo: [ 'first_bar' ] },
|
||||
data: { foo: [ "" ] },
|
||||
timestamp: 123,
|
||||
},
|
||||
{
|
||||
data: { foo: [ 'second_bar' ] },
|
||||
data: { foo: [ "first" ] },
|
||||
timestamp: 234,
|
||||
},
|
||||
],
|
||||
|
@ -226,8 +371,8 @@ describe( 'system.DeltaProcessor', () =>
|
|||
expected: [
|
||||
{
|
||||
doc_id: 111,
|
||||
delta: { foo: [ 'second_bar' ] },
|
||||
bucket: { foo: [ 'second_bar' ] },
|
||||
rdelta: { foo: [ "first" ] },
|
||||
bucket: { foo: [ "second" ] },
|
||||
ratedata: {}
|
||||
},
|
||||
],
|
||||
|
@ -253,7 +398,7 @@ describe( 'system.DeltaProcessor', () =>
|
|||
{
|
||||
published.push( {
|
||||
doc_id: doc_id,
|
||||
delta: delta.data,
|
||||
rdelta: delta.data,
|
||||
bucket: bucket,
|
||||
ratedata: ratedata,
|
||||
} );
|
||||
|
@ -314,13 +459,13 @@ describe( 'system.DeltaProcessor', () =>
|
|||
{
|
||||
doc_id: 123,
|
||||
delta: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'start_bar' ] },
|
||||
ratedata: {},
|
||||
},
|
||||
{
|
||||
doc_id: 234,
|
||||
delta: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'start_bar' ] },
|
||||
ratedata: {},
|
||||
}
|
||||
];
|
||||
|
@ -416,7 +561,7 @@ describe( 'system.DeltaProcessor', () =>
|
|||
const expected_published = [ {
|
||||
doc_id: 123,
|
||||
delta: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'first_bar' ] },
|
||||
bucket: { foo: [ 'start_bar' ] },
|
||||
ratedata: {},
|
||||
} ];
|
||||
|
||||
|
|
Loading…
Reference in New Issue