123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- 'use strict'
- /*
- * merge2
- * https://github.com/teambition/merge2
- *
- * Copyright (c) 2014-2020 Teambition
- * Licensed under the MIT license.
- */
- const Stream = require('stream')
- const PassThrough = Stream.PassThrough
- const slice = Array.prototype.slice
-
- module.exports = merge2
-
- function merge2 () {
- const streamsQueue = []
- const args = slice.call(arguments)
- let merging = false
- let options = args[args.length - 1]
-
- if (options && !Array.isArray(options) && options.pipe == null) {
- args.pop()
- } else {
- options = {}
- }
-
- const doEnd = options.end !== false
- const doPipeError = options.pipeError === true
- if (options.objectMode == null) {
- options.objectMode = true
- }
- if (options.highWaterMark == null) {
- options.highWaterMark = 64 * 1024
- }
- const mergedStream = PassThrough(options)
-
- function addStream () {
- for (let i = 0, len = arguments.length; i < len; i++) {
- streamsQueue.push(pauseStreams(arguments[i], options))
- }
- mergeStream()
- return this
- }
-
- function mergeStream () {
- if (merging) {
- return
- }
- merging = true
-
- let streams = streamsQueue.shift()
- if (!streams) {
- process.nextTick(endStream)
- return
- }
- if (!Array.isArray(streams)) {
- streams = [streams]
- }
-
- let pipesCount = streams.length + 1
-
- function next () {
- if (--pipesCount > 0) {
- return
- }
- merging = false
- mergeStream()
- }
-
- function pipe (stream) {
- function onend () {
- stream.removeListener('merge2UnpipeEnd', onend)
- stream.removeListener('end', onend)
- if (doPipeError) {
- stream.removeListener('error', onerror)
- }
- next()
- }
- function onerror (err) {
- mergedStream.emit('error', err)
- }
- // skip ended stream
- if (stream._readableState.endEmitted) {
- return next()
- }
-
- stream.on('merge2UnpipeEnd', onend)
- stream.on('end', onend)
-
- if (doPipeError) {
- stream.on('error', onerror)
- }
-
- stream.pipe(mergedStream, { end: false })
- // compatible for old stream
- stream.resume()
- }
-
- for (let i = 0; i < streams.length; i++) {
- pipe(streams[i])
- }
-
- next()
- }
-
- function endStream () {
- merging = false
- // emit 'queueDrain' when all streams merged.
- mergedStream.emit('queueDrain')
- if (doEnd) {
- mergedStream.end()
- }
- }
-
- mergedStream.setMaxListeners(0)
- mergedStream.add = addStream
- mergedStream.on('unpipe', function (stream) {
- stream.emit('merge2UnpipeEnd')
- })
-
- if (args.length) {
- addStream.apply(null, args)
- }
- return mergedStream
- }
-
- // check and pause streams for pipe.
- function pauseStreams (streams, options) {
- if (!Array.isArray(streams)) {
- // Backwards-compat with old-style streams
- if (!streams._readableState && streams.pipe) {
- streams = streams.pipe(PassThrough(options))
- }
- if (!streams._readableState || !streams.pause || !streams.pipe) {
- throw new Error('Only readable stream can be merged.')
- }
- streams.pause()
- } else {
- for (let i = 0, len = streams.length; i < len; i++) {
- streams[i] = pauseStreams(streams[i], options)
- }
- }
- return streams
- }
|