123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- 'use strict'
-
- var reusify = require('reusify')
-
- function fastqueue (context, worker, concurrency) {
- if (typeof context === 'function') {
- concurrency = worker
- worker = context
- context = null
- }
-
- var cache = reusify(Task)
- var queueHead = null
- var queueTail = null
- var _running = 0
-
- var self = {
- push: push,
- drain: noop,
- saturated: noop,
- pause: pause,
- paused: false,
- concurrency: concurrency,
- running: running,
- resume: resume,
- idle: idle,
- length: length,
- getQueue: getQueue,
- unshift: unshift,
- empty: noop,
- kill: kill,
- killAndDrain: killAndDrain
- }
-
- return self
-
- function running () {
- return _running
- }
-
- function pause () {
- self.paused = true
- }
-
- function length () {
- var current = queueHead
- var counter = 0
-
- while (current) {
- current = current.next
- counter++
- }
-
- return counter
- }
-
- function getQueue () {
- var current = queueHead
- var tasks = []
-
- while (current) {
- tasks.push(current.value)
- current = current.next
- }
-
- return tasks
- }
-
- function resume () {
- if (!self.paused) return
- self.paused = false
- for (var i = 0; i < self.concurrency; i++) {
- _running++
- release()
- }
- }
-
- function idle () {
- return _running === 0 && self.length() === 0
- }
-
- function push (value, done) {
- var current = cache.get()
-
- current.context = context
- current.release = release
- current.value = value
- current.callback = done || noop
-
- if (_running === self.concurrency || self.paused) {
- if (queueTail) {
- queueTail.next = current
- queueTail = current
- } else {
- queueHead = current
- queueTail = current
- self.saturated()
- }
- } else {
- _running++
- worker.call(context, current.value, current.worked)
- }
- }
-
- function unshift (value, done) {
- var current = cache.get()
-
- current.context = context
- current.release = release
- current.value = value
- current.callback = done || noop
-
- if (_running === self.concurrency || self.paused) {
- if (queueHead) {
- current.next = queueHead
- queueHead = current
- } else {
- queueHead = current
- queueTail = current
- self.saturated()
- }
- } else {
- _running++
- worker.call(context, current.value, current.worked)
- }
- }
-
- function release (holder) {
- if (holder) {
- cache.release(holder)
- }
- var next = queueHead
- if (next) {
- if (!self.paused) {
- if (queueTail === queueHead) {
- queueTail = null
- }
- queueHead = next.next
- next.next = null
- worker.call(context, next.value, next.worked)
- if (queueTail === null) {
- self.empty()
- }
- } else {
- _running--
- }
- } else if (--_running === 0) {
- self.drain()
- }
- }
-
- function kill () {
- queueHead = null
- queueTail = null
- self.drain = noop
- }
-
- function killAndDrain () {
- queueHead = null
- queueTail = null
- self.drain()
- self.drain = noop
- }
- }
-
- function noop () {}
-
- function Task () {
- this.value = null
- this.callback = noop
- this.next = null
- this.release = noop
- this.context = null
-
- var self = this
-
- this.worked = function worked (err, result) {
- var callback = self.callback
- self.value = null
- self.callback = noop
- callback.call(self.context, err, result)
- self.release(self)
- }
- }
-
- module.exports = fastqueue
|