1
0
Fork 0
liza/src/system/MetricsCollector.ts

206 lines
5.8 KiB
TypeScript

/**
* Metrics Collector
*
* Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of liza.
*
* liza is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Collect Metrics for Prometheus
*/
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
import { EventEmitter } from 'events';
import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory';
const client = require( 'prom-client' )
export type MetricTimer = (
_start_time?: [ number, number ]
) => [ number, number ];
export class MetricsCollector
{
/** The prometheus PushGateway */
private _gateway: Pushgateway;
/** Delta processed time histogram */
private _process_time: Histogram;
private _process_time_name: string = 'liza_delta_process_time';
private _process_time_help: string = 'Delta process time in ms';
/** Delta error counter */
private _total_error: Counter;
private _total_error_name: string = 'liza_delta_error';
private _total_error_help: string = 'Total errors from delta processing';
/** Delta current error gauge */
private _current_error: Gauge;
private _current_error_name: string = 'liza_delta_current_error';
private _current_error_help: string =
'The current number of documents in an error state';
/** Delta error counter */
private _total_processed: Counter;
private _total_processed_name: string = 'liza_delta_success';
private _total_processed_help: string =
'Total deltas successfully processed';
/** Timing map */
private _timing_map: Record<string, [ number, number ]> = {};
private _push_interval: NodeJS.Timer;
/**
* Initialize delta logger
*
* @param _factory - A factory to create prometheus components
* @param _conf - Prometheus configuration
* @param _emitter - Event emitter
* @param _timer - A timer function to create a tuple timestamp
*/
constructor(
private readonly _factory: PrometheusFactory,
private readonly _conf: PrometheusConfig,
private readonly _emitter: EventEmitter,
private readonly _timer: MetricTimer,
) {
// Set labels
client.register.setDefaultLabels( {
env: this._conf.env,
service: 'delta_processor',
} );
// Create metrics
this._gateway = this._factory.createGateway(
client,
this._conf.hostname,
this._conf.port,
);
this._process_time = this._factory.createHistogram(
client,
this._process_time_name,
this._process_time_help,
this._conf.buckets_start,
this._conf.buckets_width,
this._conf.buckets_count,
);
this._total_error = this._factory.createCounter(
client,
this._total_error_name,
this._total_error_help,
);
this._current_error = this._factory.createGauge(
client,
this._current_error_name,
this._current_error_help,
);
this._total_processed = this._factory.createCounter(
client,
this._total_processed_name,
this._total_processed_help,
);
// Push metrics on a specific interval
this._push_interval = setInterval( () =>
{
this._gateway.pushAdd(
{ jobName: 'liza_delta_metrics' },
this.getPushCallback( this )
);
}, this._conf.push_interval_ms
);
// Subsribe metrics to events
this.hookMetrics();
}
/**
* Stop the push interval
*/
stop(): void
{
clearInterval( this._push_interval );
}
/**
* List to events to update metrics
*/
private hookMetrics(): void
{
this._emitter.on(
'delta-process-start',
( uid: string ) => { this._timing_map[ uid ] = this._timer(); }
);
this._emitter.on(
'delta-process-end',
( uid: string ) =>
{
const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ];
const t = this._timer( start_time_ms );
const total_time_ms = t[ 0 ] * 1000 + t[ 1 ] / 1000000;
this._process_time.observe( total_time_ms );
this._total_processed.inc();
}
);
this._emitter.on( 'error', ( _ ) => this._total_error.inc() );
}
/**
* Handle push error
*
* @param self - Metrics Collector object
*
* @return a function to handle the pushAdd callback
*/
private getPushCallback( self: MetricsCollector ): () => void
{
return (
error?: Error | undefined,
_response?: any,
_body?: any
): void =>
{
if ( error )
{
self._emitter.emit( 'error', error );
}
}
}
/**
* Update metrics with current error count
*
* @param count - the number of errors found
*/
updateErrorCount( count: number ): void
{
this._current_error.set( +count );
}
}