237 lines
5.0 KiB
JavaScript
237 lines
5.0 KiB
JavaScript
|
var util = require('util');
|
|||
|
var events = require('events').EventEmitter;
|
|||
|
|
|||
|
var qjob = function(options) {
|
|||
|
|
|||
|
if(false === (this instanceof qjob)) {
|
|||
|
return new qjob(options);
|
|||
|
}
|
|||
|
|
|||
|
this.maxConcurrency = 10;
|
|||
|
this.jobsRunning = 0;
|
|||
|
this.jobsDone = 0;
|
|||
|
this.jobsTotal = 0;
|
|||
|
this.timeStart;
|
|||
|
this.jobId = 0;
|
|||
|
this.jobsList = [];
|
|||
|
this.paused = false;
|
|||
|
this.pausedId = null;
|
|||
|
this.lastPause = 0;
|
|||
|
|
|||
|
this.interval = null;
|
|||
|
this.stopAdding = false;
|
|||
|
this.sleeping = false;
|
|||
|
|
|||
|
this.aborting = false;
|
|||
|
|
|||
|
if (options) {
|
|||
|
this.maxConcurrency = options.maxConcurrency || this.maxConcurrency;
|
|||
|
this.interval = options.interval || this.interval;
|
|||
|
}
|
|||
|
events.call(this);
|
|||
|
};
|
|||
|
|
|||
|
util.inherits(qjob, events);
|
|||
|
|
|||
|
/*
|
|||
|
* helper to set max concurrency
|
|||
|
*/
|
|||
|
qjob.prototype.setConcurrency = function(max) {
|
|||
|
this.maxConcurrency = max;
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* helper to set delay between rafales
|
|||
|
*/
|
|||
|
qjob.prototype.setInterval = function(delay) {
|
|||
|
this.interval = delay;
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* add some jobs in the queue
|
|||
|
*/
|
|||
|
qjob.prototype.add = function(job,args) {
|
|||
|
var self = this;
|
|||
|
self.jobsList.push([job,args]);
|
|||
|
self.jobsTotal++;
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
*
|
|||
|
*/
|
|||
|
qjob.prototype.sleepDueToInterval = function() {
|
|||
|
var self = this;
|
|||
|
|
|||
|
if (this.interval === null) {
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
if (this.sleeping) {
|
|||
|
return true;
|
|||
|
}
|
|||
|
|
|||
|
if (this.stopAdding) {
|
|||
|
|
|||
|
if (this.jobsRunning > 0) {
|
|||
|
//console.log('waiting for '+jobsRunning+' jobs to finish');
|
|||
|
return true;
|
|||
|
}
|
|||
|
|
|||
|
//console.log('waiting for '+rafaleDelay+' ms');
|
|||
|
this.sleeping = true;
|
|||
|
self.emit('sleep');
|
|||
|
|
|||
|
setTimeout(function() {
|
|||
|
this.stopAdding = false;
|
|||
|
this.sleeping = false;
|
|||
|
self.emit('continu');
|
|||
|
self.run();
|
|||
|
}.bind(self),this.interval);
|
|||
|
|
|||
|
return true;
|
|||
|
}
|
|||
|
|
|||
|
if (this.jobsRunning + 1 == this.maxConcurrency) {
|
|||
|
//console.log('max concurrent jobs reached');
|
|||
|
this.stopAdding = true;
|
|||
|
return true;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* run the queue
|
|||
|
*/
|
|||
|
qjob.prototype.run = function() {
|
|||
|
|
|||
|
var self = this;
|
|||
|
|
|||
|
// first launch, let's emit start event
|
|||
|
if (this.jobsDone == 0) {
|
|||
|
self.emit('start');
|
|||
|
this.timeStart = Date.now();
|
|||
|
}
|
|||
|
|
|||
|
if (self.sleepDueToInterval()) return;
|
|||
|
|
|||
|
if (self.aborting) {
|
|||
|
this.jobsList = [];
|
|||
|
}
|
|||
|
|
|||
|
// while queue is empty and number of job running
|
|||
|
// concurrently are less than max job running,
|
|||
|
// then launch the next job
|
|||
|
|
|||
|
while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) {
|
|||
|
// get the next job and
|
|||
|
// remove it from the queue
|
|||
|
var job = self.jobsList.shift();
|
|||
|
|
|||
|
// increment number of job running
|
|||
|
self.jobsRunning++;
|
|||
|
|
|||
|
// fetch args for the job
|
|||
|
var args = job[1];
|
|||
|
|
|||
|
// add jobId in args
|
|||
|
args._jobId = this.jobId++;
|
|||
|
|
|||
|
// emit jobStart event
|
|||
|
self.emit('jobStart',args);
|
|||
|
|
|||
|
// run the job
|
|||
|
setTimeout(function() {
|
|||
|
this.j(this.args,self.next.bind(self,this.args));
|
|||
|
}.bind({j:job[0],args:args}),1);
|
|||
|
}
|
|||
|
|
|||
|
// all jobs done ? emit end event
|
|||
|
if (this.jobsList.length == 0 && this.jobsRunning == 0) {
|
|||
|
self.emit('end');
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* a task has been terminated,
|
|||
|
* so 'next()' has been called
|
|||
|
*/
|
|||
|
qjob.prototype.next = function(args) {
|
|||
|
|
|||
|
var self = this;
|
|||
|
|
|||
|
// update counters
|
|||
|
this.jobsRunning--;
|
|||
|
this.jobsDone++;
|
|||
|
|
|||
|
// emit 'jobEnd' event
|
|||
|
self.emit('jobEnd',args);
|
|||
|
|
|||
|
// if queue has been set to pause
|
|||
|
// then do nothing
|
|||
|
if (this.paused) return;
|
|||
|
|
|||
|
// else, execute run() function
|
|||
|
self.run();
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* You can 'pause' jobs.
|
|||
|
* it will not pause running jobs, but
|
|||
|
* it will stop launching pending jobs
|
|||
|
* until paused = false
|
|||
|
*/
|
|||
|
qjob.prototype.pause = function(status) {
|
|||
|
var self = this;
|
|||
|
this.paused = status;
|
|||
|
if (!this.paused && this.pausedId) {
|
|||
|
clearInterval(this.pausedId);
|
|||
|
self.emit('unpause');
|
|||
|
this.run();
|
|||
|
}
|
|||
|
if (this.paused && !this.pausedId) {
|
|||
|
self.lastPause = Date.now();
|
|||
|
this.pausedId = setInterval(function() {
|
|||
|
var since = Date.now() - self.lastPause;
|
|||
|
self.emit('pause',since);
|
|||
|
},1000);
|
|||
|
return;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
qjob.prototype.stats = function() {
|
|||
|
|
|||
|
var now = Date.now();
|
|||
|
|
|||
|
var o = {};
|
|||
|
o._timeStart = this.timeStart || 'N/A';
|
|||
|
o._timeElapsed = (now - this.timeStart) || 'N/A';
|
|||
|
o._jobsTotal = this.jobsTotal;
|
|||
|
o._jobsRunning = this.jobsRunning;
|
|||
|
o._jobsDone = this.jobsDone;
|
|||
|
o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100);
|
|||
|
o._concurrency = this.maxConcurrency;
|
|||
|
|
|||
|
if (this.paused) {
|
|||
|
o._status = 'Paused';
|
|||
|
return o;
|
|||
|
}
|
|||
|
|
|||
|
if (o._timeElapsed == 'N/A') {
|
|||
|
o._status = 'Starting';
|
|||
|
return o;
|
|||
|
}
|
|||
|
|
|||
|
if (this.jobsTotal == this.jobsDone) {
|
|||
|
o._status = 'Finished';
|
|||
|
return o;
|
|||
|
}
|
|||
|
|
|||
|
o._status = 'Running';
|
|||
|
return o;
|
|||
|
}
|
|||
|
|
|||
|
qjob.prototype.abort = function() {
|
|||
|
this.aborting = true;
|
|||
|
}
|
|||
|
|
|||
|
module.exports = qjob;
|