diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index bc48dd1..6b943e2 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -53,9 +53,10 @@ export type DeltaResult = { [K in keyof T]: DeltaDatum | null }; /** Complete delta type */ export type Delta = { - type: DeltaType, - timestamp: UnixTimestamp, - data: DeltaResult, + type: DeltaType, + timestamp: UnixTimestamp, + concluding_save: boolean, + data: DeltaResult, } diff --git a/src/server/Server.js b/src/server/Server.js index d839d7b..8e28180 100644 --- a/src/server/Server.js +++ b/src/server/Server.js @@ -1144,9 +1144,10 @@ module.exports = Class( 'Server' ) { try { - var rdelta_data; - var parsed_data = JSON.parse( post_data.data ); - var bucket = quote.getBucket(); + var rdelta_data; + var parsed_data = JSON.parse( post_data.data ); + var bucket = quote.getBucket(); + const concluding_save = post_data.concluding_save; const { filtered, dapis, meta_clear, rdiff } = server._dataProcessor.processDiff( @@ -1158,8 +1159,9 @@ module.exports = Class( 'Server' ) { rdelta_data = { "rdelta.data": { - data: rdiff, - timestamp: Math.round( + data: rdiff, + concluding_save: ( concluding_save === 'true' ), + timestamp: Math.round( new Date().getTime() / 1000 ), } diff --git a/src/server/service/RatingService.ts b/src/server/service/RatingService.ts index a81d3f2..4c6fd75 100644 --- a/src/server/service/RatingService.ts +++ b/src/server/service/RatingService.ts @@ -264,8 +264,9 @@ export class RatingService const save_data = { ratedata: data }; const rdelta_data = { "rdelta.ratedata": { - data: this._createDelta( data, quote_data ), - timestamp: cur_date + data: this._createDelta( data, quote_data ), + concluding_save: false, + timestamp: cur_date, }, }; diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts index 1d9a49c..03bbf6f 100644 --- a/src/system/avro/V1MessageWriter.ts +++ b/src/system/avro/V1MessageWriter.ts @@ -105,12 +105,26 @@ export class V1MessageWriter implements MessageWriter const last_update_ms = { "long": meta.lastUpdate * 1000 }; const ts_ms = ts * 1000; + + let step = null; + + if( delta.concluding_save === true ) + { + step = { + EventStep: { + transition: 'END', + src: '', + dest: '', + } + } + } + return { event: { id: event_id, ts: ts_ms, actor: 'SERVER', - step: null, + step: step, }, document: { id: meta.id, @@ -266,4 +280,4 @@ export class V1MessageWriter implements MessageWriter return data_formatted; } -} \ No newline at end of file +} diff --git a/test/server/service/RatingServiceTest.ts b/test/server/service/RatingServiceTest.ts index 546118c..016d75f 100644 --- a/test/server/service/RatingServiceTest.ts +++ b/test/server/service/RatingServiceTest.ts @@ -468,6 +468,7 @@ function getStubs() data: { _unavailable_all: [ undefined ] }, + concluding_save: false, timestamp: 123 } }; diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index e4a54d4..485af31 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -459,9 +459,10 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, - timestamp: 123123, - type: 'data', + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + concluding_save: false, } ], ratedata: [], @@ -583,9 +584,10 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, - timestamp: 123123, - type: 'data', + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + concluding_save: false, } ], ratedata: [], @@ -603,9 +605,10 @@ describe( 'system.DeltaProcessor', () => rdelta: { data: [ { - data: { foo: [ 'first_bar' ] }, - timestamp: 123123, - type: 'data', + data: { foo: [ 'first_bar' ] }, + timestamp: 123123, + type: 'data', + concluding_save: false, } ], ratedata: [], diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts index 2debf9d..210fd22 100644 --- a/test/system/V1MessageWriterTest.ts +++ b/test/system/V1MessageWriterTest.ts @@ -189,17 +189,12 @@ describe( 'system.V1MessageWriter', () => if ( valid ) { - // return expect( result ).to.eventually.deep.equal( - // Buffer.from( '' ) - // ) - // .then( b => - // { - // expect( typeof(b) ).to.equal( 'object' ); - // } ); - return result.catch( e => + return expect( result ).to.eventually.deep.equal( + Buffer.from( '' ) + ) + .then( b => { - console.log( 'avroerror: ', e ); - expect.fail(); + expect( typeof(b) ).to.equal( 'object' ); } ); } else @@ -396,9 +391,10 @@ describe( 'system.V1MessageWriter', () => }; const delta = >{ - type: 'data', - timestamp: 123123123, - data: >{}, + type: 'data', + timestamp: 123123123, + data: >{}, + concluding_save: true, }; const expected = { @@ -406,7 +402,13 @@ describe( 'system.V1MessageWriter', () => id: 'STEP_SAVE', ts: ts * 1000, actor: 'SERVER', - step: null, + step: { + EventStep: { + transition: 'END', + src: '', + dest: '', + }, + }, }, document: { id: doc_id,