You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1018 lines
32 KiB
1018 lines
32 KiB
const proc = typeof process === 'object' && process |
|
? process |
|
: { |
|
stdout: null, |
|
stderr: null, |
|
}; |
|
import { EventEmitter } from 'node:events'; |
|
import Stream from 'node:stream'; |
|
import { StringDecoder } from 'node:string_decoder'; |
|
/** |
|
* Return true if the argument is a Minipass stream, Node stream, or something |
|
* else that Minipass can interact with. |
|
*/ |
|
export const isStream = (s) => !!s && |
|
typeof s === 'object' && |
|
(s instanceof Minipass || |
|
s instanceof Stream || |
|
isReadable(s) || |
|
isWritable(s)); |
|
/** |
|
* Return true if the argument is a valid {@link Minipass.Readable} |
|
*/ |
|
export const isReadable = (s) => !!s && |
|
typeof s === 'object' && |
|
s instanceof EventEmitter && |
|
typeof s.pipe === 'function' && |
|
// node core Writable streams have a pipe() method, but it throws |
|
s.pipe !== Stream.Writable.prototype.pipe; |
|
/** |
|
* Return true if the argument is a valid {@link Minipass.Writable} |
|
*/ |
|
export const isWritable = (s) => !!s && |
|
typeof s === 'object' && |
|
s instanceof EventEmitter && |
|
typeof s.write === 'function' && |
|
typeof s.end === 'function'; |
|
const EOF = Symbol('EOF'); |
|
const MAYBE_EMIT_END = Symbol('maybeEmitEnd'); |
|
const EMITTED_END = Symbol('emittedEnd'); |
|
const EMITTING_END = Symbol('emittingEnd'); |
|
const EMITTED_ERROR = Symbol('emittedError'); |
|
const CLOSED = Symbol('closed'); |
|
const READ = Symbol('read'); |
|
const FLUSH = Symbol('flush'); |
|
const FLUSHCHUNK = Symbol('flushChunk'); |
|
const ENCODING = Symbol('encoding'); |
|
const DECODER = Symbol('decoder'); |
|
const FLOWING = Symbol('flowing'); |
|
const PAUSED = Symbol('paused'); |
|
const RESUME = Symbol('resume'); |
|
const BUFFER = Symbol('buffer'); |
|
const PIPES = Symbol('pipes'); |
|
const BUFFERLENGTH = Symbol('bufferLength'); |
|
const BUFFERPUSH = Symbol('bufferPush'); |
|
const BUFFERSHIFT = Symbol('bufferShift'); |
|
const OBJECTMODE = Symbol('objectMode'); |
|
// internal event when stream is destroyed |
|
const DESTROYED = Symbol('destroyed'); |
|
// internal event when stream has an error |
|
const ERROR = Symbol('error'); |
|
const EMITDATA = Symbol('emitData'); |
|
const EMITEND = Symbol('emitEnd'); |
|
const EMITEND2 = Symbol('emitEnd2'); |
|
const ASYNC = Symbol('async'); |
|
const ABORT = Symbol('abort'); |
|
const ABORTED = Symbol('aborted'); |
|
const SIGNAL = Symbol('signal'); |
|
const DATALISTENERS = Symbol('dataListeners'); |
|
const DISCARDED = Symbol('discarded'); |
|
const defer = (fn) => Promise.resolve().then(fn); |
|
const nodefer = (fn) => fn(); |
|
const isEndish = (ev) => ev === 'end' || ev === 'finish' || ev === 'prefinish'; |
|
const isArrayBufferLike = (b) => b instanceof ArrayBuffer || |
|
(!!b && |
|
typeof b === 'object' && |
|
b.constructor && |
|
b.constructor.name === 'ArrayBuffer' && |
|
b.byteLength >= 0); |
|
const isArrayBufferView = (b) => !Buffer.isBuffer(b) && ArrayBuffer.isView(b); |
|
/** |
|
* Internal class representing a pipe to a destination stream. |
|
* |
|
* @internal |
|
*/ |
|
class Pipe { |
|
src; |
|
dest; |
|
opts; |
|
ondrain; |
|
constructor(src, dest, opts) { |
|
this.src = src; |
|
this.dest = dest; |
|
this.opts = opts; |
|
this.ondrain = () => src[RESUME](); |
|
this.dest.on('drain', this.ondrain); |
|
} |
|
unpipe() { |
|
this.dest.removeListener('drain', this.ondrain); |
|
} |
|
// only here for the prototype |
|
/* c8 ignore start */ |
|
proxyErrors(_er) { } |
|
/* c8 ignore stop */ |
|
end() { |
|
this.unpipe(); |
|
if (this.opts.end) |
|
this.dest.end(); |
|
} |
|
} |
|
/** |
|
* Internal class representing a pipe to a destination stream where |
|
* errors are proxied. |
|
* |
|
* @internal |
|
*/ |
|
class PipeProxyErrors extends Pipe { |
|
unpipe() { |
|
this.src.removeListener('error', this.proxyErrors); |
|
super.unpipe(); |
|
} |
|
constructor(src, dest, opts) { |
|
super(src, dest, opts); |
|
this.proxyErrors = er => dest.emit('error', er); |
|
src.on('error', this.proxyErrors); |
|
} |
|
} |
|
const isObjectModeOptions = (o) => !!o.objectMode; |
|
const isEncodingOptions = (o) => !o.objectMode && !!o.encoding && o.encoding !== 'buffer'; |
|
/** |
|
* Main export, the Minipass class |
|
* |
|
* `RType` is the type of data emitted, defaults to Buffer |
|
* |
|
* `WType` is the type of data to be written, if RType is buffer or string, |
|
* then any {@link Minipass.ContiguousData} is allowed. |
|
* |
|
* `Events` is the set of event handler signatures that this object |
|
* will emit, see {@link Minipass.Events} |
|
*/ |
|
export class Minipass extends EventEmitter { |
|
[FLOWING] = false; |
|
[PAUSED] = false; |
|
[PIPES] = []; |
|
[BUFFER] = []; |
|
[OBJECTMODE]; |
|
[ENCODING]; |
|
[ASYNC]; |
|
[DECODER]; |
|
[EOF] = false; |
|
[EMITTED_END] = false; |
|
[EMITTING_END] = false; |
|
[CLOSED] = false; |
|
[EMITTED_ERROR] = null; |
|
[BUFFERLENGTH] = 0; |
|
[DESTROYED] = false; |
|
[SIGNAL]; |
|
[ABORTED] = false; |
|
[DATALISTENERS] = 0; |
|
[DISCARDED] = false; |
|
/** |
|
* true if the stream can be written |
|
*/ |
|
writable = true; |
|
/** |
|
* true if the stream can be read |
|
*/ |
|
readable = true; |
|
/** |
|
* If `RType` is Buffer, then options do not need to be provided. |
|
* Otherwise, an options object must be provided to specify either |
|
* {@link Minipass.SharedOptions.objectMode} or |
|
* {@link Minipass.SharedOptions.encoding}, as appropriate. |
|
*/ |
|
constructor(...args) { |
|
const options = (args[0] || |
|
{}); |
|
super(); |
|
if (options.objectMode && typeof options.encoding === 'string') { |
|
throw new TypeError('Encoding and objectMode may not be used together'); |
|
} |
|
if (isObjectModeOptions(options)) { |
|
this[OBJECTMODE] = true; |
|
this[ENCODING] = null; |
|
} |
|
else if (isEncodingOptions(options)) { |
|
this[ENCODING] = options.encoding; |
|
this[OBJECTMODE] = false; |
|
} |
|
else { |
|
this[OBJECTMODE] = false; |
|
this[ENCODING] = null; |
|
} |
|
this[ASYNC] = !!options.async; |
|
this[DECODER] = this[ENCODING] |
|
? new StringDecoder(this[ENCODING]) |
|
: null; |
|
//@ts-ignore - private option for debugging and testing |
|
if (options && options.debugExposeBuffer === true) { |
|
Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] }); |
|
} |
|
//@ts-ignore - private option for debugging and testing |
|
if (options && options.debugExposePipes === true) { |
|
Object.defineProperty(this, 'pipes', { get: () => this[PIPES] }); |
|
} |
|
const { signal } = options; |
|
if (signal) { |
|
this[SIGNAL] = signal; |
|
if (signal.aborted) { |
|
this[ABORT](); |
|
} |
|
else { |
|
signal.addEventListener('abort', () => this[ABORT]()); |
|
} |
|
} |
|
} |
|
/** |
|
* The amount of data stored in the buffer waiting to be read. |
|
* |
|
* For Buffer strings, this will be the total byte length. |
|
* For string encoding streams, this will be the string character length, |
|
* according to JavaScript's `string.length` logic. |
|
* For objectMode streams, this is a count of the items waiting to be |
|
* emitted. |
|
*/ |
|
get bufferLength() { |
|
return this[BUFFERLENGTH]; |
|
} |
|
/** |
|
* The `BufferEncoding` currently in use, or `null` |
|
*/ |
|
get encoding() { |
|
return this[ENCODING]; |
|
} |
|
/** |
|
* @deprecated - This is a read only property |
|
*/ |
|
set encoding(_enc) { |
|
throw new Error('Encoding must be set at instantiation time'); |
|
} |
|
/** |
|
* @deprecated - Encoding may only be set at instantiation time |
|
*/ |
|
setEncoding(_enc) { |
|
throw new Error('Encoding must be set at instantiation time'); |
|
} |
|
/** |
|
* True if this is an objectMode stream |
|
*/ |
|
get objectMode() { |
|
return this[OBJECTMODE]; |
|
} |
|
/** |
|
* @deprecated - This is a read-only property |
|
*/ |
|
set objectMode(_om) { |
|
throw new Error('objectMode must be set at instantiation time'); |
|
} |
|
/** |
|
* true if this is an async stream |
|
*/ |
|
get ['async']() { |
|
return this[ASYNC]; |
|
} |
|
/** |
|
* Set to true to make this stream async. |
|
* |
|
* Once set, it cannot be unset, as this would potentially cause incorrect |
|
* behavior. Ie, a sync stream can be made async, but an async stream |
|
* cannot be safely made sync. |
|
*/ |
|
set ['async'](a) { |
|
this[ASYNC] = this[ASYNC] || !!a; |
|
} |
|
// drop everything and get out of the flow completely |
|
[ABORT]() { |
|
this[ABORTED] = true; |
|
this.emit('abort', this[SIGNAL]?.reason); |
|
this.destroy(this[SIGNAL]?.reason); |
|
} |
|
/** |
|
* True if the stream has been aborted. |
|
*/ |
|
get aborted() { |
|
return this[ABORTED]; |
|
} |
|
/** |
|
* No-op setter. Stream aborted status is set via the AbortSignal provided |
|
* in the constructor options. |
|
*/ |
|
set aborted(_) { } |
|
write(chunk, encoding, cb) { |
|
if (this[ABORTED]) |
|
return false; |
|
if (this[EOF]) |
|
throw new Error('write after end'); |
|
if (this[DESTROYED]) { |
|
this.emit('error', Object.assign(new Error('Cannot call write after a stream was destroyed'), { code: 'ERR_STREAM_DESTROYED' })); |
|
return true; |
|
} |
|
if (typeof encoding === 'function') { |
|
cb = encoding; |
|
encoding = 'utf8'; |
|
} |
|
if (!encoding) |
|
encoding = 'utf8'; |
|
const fn = this[ASYNC] ? defer : nodefer; |
|
// convert array buffers and typed array views into buffers |
|
// at some point in the future, we may want to do the opposite! |
|
// leave strings and buffers as-is |
|
// anything is only allowed if in object mode, so throw |
|
if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) { |
|
if (isArrayBufferView(chunk)) { |
|
//@ts-ignore - sinful unsafe type changing |
|
chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength); |
|
} |
|
else if (isArrayBufferLike(chunk)) { |
|
//@ts-ignore - sinful unsafe type changing |
|
chunk = Buffer.from(chunk); |
|
} |
|
else if (typeof chunk !== 'string') { |
|
throw new Error('Non-contiguous data written to non-objectMode stream'); |
|
} |
|
} |
|
// handle object mode up front, since it's simpler |
|
// this yields better performance, fewer checks later. |
|
if (this[OBJECTMODE]) { |
|
// maybe impossible? |
|
/* c8 ignore start */ |
|
if (this[FLOWING] && this[BUFFERLENGTH] !== 0) |
|
this[FLUSH](true); |
|
/* c8 ignore stop */ |
|
if (this[FLOWING]) |
|
this.emit('data', chunk); |
|
else |
|
this[BUFFERPUSH](chunk); |
|
if (this[BUFFERLENGTH] !== 0) |
|
this.emit('readable'); |
|
if (cb) |
|
fn(cb); |
|
return this[FLOWING]; |
|
} |
|
// at this point the chunk is a buffer or string |
|
// don't buffer it up or send it to the decoder |
|
if (!chunk.length) { |
|
if (this[BUFFERLENGTH] !== 0) |
|
this.emit('readable'); |
|
if (cb) |
|
fn(cb); |
|
return this[FLOWING]; |
|
} |
|
// fast-path writing strings of same encoding to a stream with |
|
// an empty buffer, skipping the buffer/decoder dance |
|
if (typeof chunk === 'string' && |
|
// unless it is a string already ready for us to use |
|
!(encoding === this[ENCODING] && !this[DECODER]?.lastNeed)) { |
|
//@ts-ignore - sinful unsafe type change |
|
chunk = Buffer.from(chunk, encoding); |
|
} |
|
if (Buffer.isBuffer(chunk) && this[ENCODING]) { |
|
//@ts-ignore - sinful unsafe type change |
|
chunk = this[DECODER].write(chunk); |
|
} |
|
// Note: flushing CAN potentially switch us into not-flowing mode |
|
if (this[FLOWING] && this[BUFFERLENGTH] !== 0) |
|
this[FLUSH](true); |
|
if (this[FLOWING]) |
|
this.emit('data', chunk); |
|
else |
|
this[BUFFERPUSH](chunk); |
|
if (this[BUFFERLENGTH] !== 0) |
|
this.emit('readable'); |
|
if (cb) |
|
fn(cb); |
|
return this[FLOWING]; |
|
} |
|
/** |
|
* Low-level explicit read method. |
|
* |
|
* In objectMode, the argument is ignored, and one item is returned if |
|
* available. |
|
* |
|
* `n` is the number of bytes (or in the case of encoding streams, |
|
* characters) to consume. If `n` is not provided, then the entire buffer |
|
* is returned, or `null` is returned if no data is available. |
|
* |
|
* If `n` is greater that the amount of data in the internal buffer, |
|
* then `null` is returned. |
|
*/ |
|
read(n) { |
|
if (this[DESTROYED]) |
|
return null; |
|
this[DISCARDED] = false; |
|
if (this[BUFFERLENGTH] === 0 || |
|
n === 0 || |
|
(n && n > this[BUFFERLENGTH])) { |
|
this[MAYBE_EMIT_END](); |
|
return null; |
|
} |
|
if (this[OBJECTMODE]) |
|
n = null; |
|
if (this[BUFFER].length > 1 && !this[OBJECTMODE]) { |
|
// not object mode, so if we have an encoding, then RType is string |
|
// otherwise, must be Buffer |
|
this[BUFFER] = [ |
|
(this[ENCODING] |
|
? this[BUFFER].join('') |
|
: Buffer.concat(this[BUFFER], this[BUFFERLENGTH])), |
|
]; |
|
} |
|
const ret = this[READ](n || null, this[BUFFER][0]); |
|
this[MAYBE_EMIT_END](); |
|
return ret; |
|
} |
|
[READ](n, chunk) { |
|
if (this[OBJECTMODE]) |
|
this[BUFFERSHIFT](); |
|
else { |
|
const c = chunk; |
|
if (n === c.length || n === null) |
|
this[BUFFERSHIFT](); |
|
else if (typeof c === 'string') { |
|
this[BUFFER][0] = c.slice(n); |
|
chunk = c.slice(0, n); |
|
this[BUFFERLENGTH] -= n; |
|
} |
|
else { |
|
this[BUFFER][0] = c.subarray(n); |
|
chunk = c.subarray(0, n); |
|
this[BUFFERLENGTH] -= n; |
|
} |
|
} |
|
this.emit('data', chunk); |
|
if (!this[BUFFER].length && !this[EOF]) |
|
this.emit('drain'); |
|
return chunk; |
|
} |
|
end(chunk, encoding, cb) { |
|
if (typeof chunk === 'function') { |
|
cb = chunk; |
|
chunk = undefined; |
|
} |
|
if (typeof encoding === 'function') { |
|
cb = encoding; |
|
encoding = 'utf8'; |
|
} |
|
if (chunk !== undefined) |
|
this.write(chunk, encoding); |
|
if (cb) |
|
this.once('end', cb); |
|
this[EOF] = true; |
|
this.writable = false; |
|
// if we haven't written anything, then go ahead and emit, |
|
// even if we're not reading. |
|
// we'll re-emit if a new 'end' listener is added anyway. |
|
// This makes MP more suitable to write-only use cases. |
|
if (this[FLOWING] || !this[PAUSED]) |
|
this[MAYBE_EMIT_END](); |
|
return this; |
|
} |
|
// don't let the internal resume be overwritten |
|
[RESUME]() { |
|
if (this[DESTROYED]) |
|
return; |
|
if (!this[DATALISTENERS] && !this[PIPES].length) { |
|
this[DISCARDED] = true; |
|
} |
|
this[PAUSED] = false; |
|
this[FLOWING] = true; |
|
this.emit('resume'); |
|
if (this[BUFFER].length) |
|
this[FLUSH](); |
|
else if (this[EOF]) |
|
this[MAYBE_EMIT_END](); |
|
else |
|
this.emit('drain'); |
|
} |
|
/** |
|
* Resume the stream if it is currently in a paused state |
|
* |
|
* If called when there are no pipe destinations or `data` event listeners, |
|
* this will place the stream in a "discarded" state, where all data will |
|
* be thrown away. The discarded state is removed if a pipe destination or |
|
* data handler is added, if pause() is called, or if any synchronous or |
|
* asynchronous iteration is started. |
|
*/ |
|
resume() { |
|
return this[RESUME](); |
|
} |
|
/** |
|
* Pause the stream |
|
*/ |
|
pause() { |
|
this[FLOWING] = false; |
|
this[PAUSED] = true; |
|
this[DISCARDED] = false; |
|
} |
|
/** |
|
* true if the stream has been forcibly destroyed |
|
*/ |
|
get destroyed() { |
|
return this[DESTROYED]; |
|
} |
|
/** |
|
* true if the stream is currently in a flowing state, meaning that |
|
* any writes will be immediately emitted. |
|
*/ |
|
get flowing() { |
|
return this[FLOWING]; |
|
} |
|
/** |
|
* true if the stream is currently in a paused state |
|
*/ |
|
get paused() { |
|
return this[PAUSED]; |
|
} |
|
[BUFFERPUSH](chunk) { |
|
if (this[OBJECTMODE]) |
|
this[BUFFERLENGTH] += 1; |
|
else |
|
this[BUFFERLENGTH] += chunk.length; |
|
this[BUFFER].push(chunk); |
|
} |
|
[BUFFERSHIFT]() { |
|
if (this[OBJECTMODE]) |
|
this[BUFFERLENGTH] -= 1; |
|
else |
|
this[BUFFERLENGTH] -= this[BUFFER][0].length; |
|
return this[BUFFER].shift(); |
|
} |
|
[FLUSH](noDrain = false) { |
|
do { } while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) && |
|
this[BUFFER].length); |
|
if (!noDrain && !this[BUFFER].length && !this[EOF]) |
|
this.emit('drain'); |
|
} |
|
[FLUSHCHUNK](chunk) { |
|
this.emit('data', chunk); |
|
return this[FLOWING]; |
|
} |
|
/** |
|
* Pipe all data emitted by this stream into the destination provided. |
|
* |
|
* Triggers the flow of data. |
|
*/ |
|
pipe(dest, opts) { |
|
if (this[DESTROYED]) |
|
return dest; |
|
this[DISCARDED] = false; |
|
const ended = this[EMITTED_END]; |
|
opts = opts || {}; |
|
if (dest === proc.stdout || dest === proc.stderr) |
|
opts.end = false; |
|
else |
|
opts.end = opts.end !== false; |
|
opts.proxyErrors = !!opts.proxyErrors; |
|
// piping an ended stream ends immediately |
|
if (ended) { |
|
if (opts.end) |
|
dest.end(); |
|
} |
|
else { |
|
// "as" here just ignores the WType, which pipes don't care about, |
|
// since they're only consuming from us, and writing to the dest |
|
this[PIPES].push(!opts.proxyErrors |
|
? new Pipe(this, dest, opts) |
|
: new PipeProxyErrors(this, dest, opts)); |
|
if (this[ASYNC]) |
|
defer(() => this[RESUME]()); |
|
else |
|
this[RESUME](); |
|
} |
|
return dest; |
|
} |
|
/** |
|
* Fully unhook a piped destination stream. |
|
* |
|
* If the destination stream was the only consumer of this stream (ie, |
|
* there are no other piped destinations or `'data'` event listeners) |
|
* then the flow of data will stop until there is another consumer or |
|
* {@link Minipass#resume} is explicitly called. |
|
*/ |
|
unpipe(dest) { |
|
const p = this[PIPES].find(p => p.dest === dest); |
|
if (p) { |
|
if (this[PIPES].length === 1) { |
|
if (this[FLOWING] && this[DATALISTENERS] === 0) { |
|
this[FLOWING] = false; |
|
} |
|
this[PIPES] = []; |
|
} |
|
else |
|
this[PIPES].splice(this[PIPES].indexOf(p), 1); |
|
p.unpipe(); |
|
} |
|
} |
|
/** |
|
* Alias for {@link Minipass#on} |
|
*/ |
|
addListener(ev, handler) { |
|
return this.on(ev, handler); |
|
} |
|
/** |
|
* Mostly identical to `EventEmitter.on`, with the following |
|
* behavior differences to prevent data loss and unnecessary hangs: |
|
* |
|
* - Adding a 'data' event handler will trigger the flow of data |
|
* |
|
* - Adding a 'readable' event handler when there is data waiting to be read |
|
* will cause 'readable' to be emitted immediately. |
|
* |
|
* - Adding an 'endish' event handler ('end', 'finish', etc.) which has |
|
* already passed will cause the event to be emitted immediately and all |
|
* handlers removed. |
|
* |
|
* - Adding an 'error' event handler after an error has been emitted will |
|
* cause the event to be re-emitted immediately with the error previously |
|
* raised. |
|
*/ |
|
on(ev, handler) { |
|
const ret = super.on(ev, handler); |
|
if (ev === 'data') { |
|
this[DISCARDED] = false; |
|
this[DATALISTENERS]++; |
|
if (!this[PIPES].length && !this[FLOWING]) { |
|
this[RESUME](); |
|
} |
|
} |
|
else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) { |
|
super.emit('readable'); |
|
} |
|
else if (isEndish(ev) && this[EMITTED_END]) { |
|
super.emit(ev); |
|
this.removeAllListeners(ev); |
|
} |
|
else if (ev === 'error' && this[EMITTED_ERROR]) { |
|
const h = handler; |
|
if (this[ASYNC]) |
|
defer(() => h.call(this, this[EMITTED_ERROR])); |
|
else |
|
h.call(this, this[EMITTED_ERROR]); |
|
} |
|
return ret; |
|
} |
|
/** |
|
* Alias for {@link Minipass#off} |
|
*/ |
|
removeListener(ev, handler) { |
|
return this.off(ev, handler); |
|
} |
|
/** |
|
* Mostly identical to `EventEmitter.off` |
|
* |
|
* If a 'data' event handler is removed, and it was the last consumer |
|
* (ie, there are no pipe destinations or other 'data' event listeners), |
|
* then the flow of data will stop until there is another consumer or |
|
* {@link Minipass#resume} is explicitly called. |
|
*/ |
|
off(ev, handler) { |
|
const ret = super.off(ev, handler); |
|
// if we previously had listeners, and now we don't, and we don't |
|
// have any pipes, then stop the flow, unless it's been explicitly |
|
// put in a discarded flowing state via stream.resume(). |
|
if (ev === 'data') { |
|
this[DATALISTENERS] = this.listeners('data').length; |
|
if (this[DATALISTENERS] === 0 && |
|
!this[DISCARDED] && |
|
!this[PIPES].length) { |
|
this[FLOWING] = false; |
|
} |
|
} |
|
return ret; |
|
} |
|
/** |
|
* Mostly identical to `EventEmitter.removeAllListeners` |
|
* |
|
* If all 'data' event handlers are removed, and they were the last consumer |
|
* (ie, there are no pipe destinations), then the flow of data will stop |
|
* until there is another consumer or {@link Minipass#resume} is explicitly |
|
* called. |
|
*/ |
|
removeAllListeners(ev) { |
|
const ret = super.removeAllListeners(ev); |
|
if (ev === 'data' || ev === undefined) { |
|
this[DATALISTENERS] = 0; |
|
if (!this[DISCARDED] && !this[PIPES].length) { |
|
this[FLOWING] = false; |
|
} |
|
} |
|
return ret; |
|
} |
|
/** |
|
* true if the 'end' event has been emitted |
|
*/ |
|
get emittedEnd() { |
|
return this[EMITTED_END]; |
|
} |
|
[MAYBE_EMIT_END]() { |
|
if (!this[EMITTING_END] && |
|
!this[EMITTED_END] && |
|
!this[DESTROYED] && |
|
this[BUFFER].length === 0 && |
|
this[EOF]) { |
|
this[EMITTING_END] = true; |
|
this.emit('end'); |
|
this.emit('prefinish'); |
|
this.emit('finish'); |
|
if (this[CLOSED]) |
|
this.emit('close'); |
|
this[EMITTING_END] = false; |
|
} |
|
} |
|
/** |
|
* Mostly identical to `EventEmitter.emit`, with the following |
|
* behavior differences to prevent data loss and unnecessary hangs: |
|
* |
|
* If the stream has been destroyed, and the event is something other |
|
* than 'close' or 'error', then `false` is returned and no handlers |
|
* are called. |
|
* |
|
* If the event is 'end', and has already been emitted, then the event |
|
* is ignored. If the stream is in a paused or non-flowing state, then |
|
* the event will be deferred until data flow resumes. If the stream is |
|
* async, then handlers will be called on the next tick rather than |
|
* immediately. |
|
* |
|
* If the event is 'close', and 'end' has not yet been emitted, then |
|
* the event will be deferred until after 'end' is emitted. |
|
* |
|
* If the event is 'error', and an AbortSignal was provided for the stream, |
|
* and there are no listeners, then the event is ignored, matching the |
|
* behavior of node core streams in the presense of an AbortSignal. |
|
* |
|
* If the event is 'finish' or 'prefinish', then all listeners will be |
|
* removed after emitting the event, to prevent double-firing. |
|
*/ |
|
emit(ev, ...args) { |
|
const data = args[0]; |
|
// error and close are only events allowed after calling destroy() |
|
if (ev !== 'error' && |
|
ev !== 'close' && |
|
ev !== DESTROYED && |
|
this[DESTROYED]) { |
|
return false; |
|
} |
|
else if (ev === 'data') { |
|
return !this[OBJECTMODE] && !data |
|
? false |
|
: this[ASYNC] |
|
? (defer(() => this[EMITDATA](data)), true) |
|
: this[EMITDATA](data); |
|
} |
|
else if (ev === 'end') { |
|
return this[EMITEND](); |
|
} |
|
else if (ev === 'close') { |
|
this[CLOSED] = true; |
|
// don't emit close before 'end' and 'finish' |
|
if (!this[EMITTED_END] && !this[DESTROYED]) |
|
return false; |
|
const ret = super.emit('close'); |
|
this.removeAllListeners('close'); |
|
return ret; |
|
} |
|
else if (ev === 'error') { |
|
this[EMITTED_ERROR] = data; |
|
super.emit(ERROR, data); |
|
const ret = !this[SIGNAL] || this.listeners('error').length |
|
? super.emit('error', data) |
|
: false; |
|
this[MAYBE_EMIT_END](); |
|
return ret; |
|
} |
|
else if (ev === 'resume') { |
|
const ret = super.emit('resume'); |
|
this[MAYBE_EMIT_END](); |
|
return ret; |
|
} |
|
else if (ev === 'finish' || ev === 'prefinish') { |
|
const ret = super.emit(ev); |
|
this.removeAllListeners(ev); |
|
return ret; |
|
} |
|
// Some other unknown event |
|
const ret = super.emit(ev, ...args); |
|
this[MAYBE_EMIT_END](); |
|
return ret; |
|
} |
|
[EMITDATA](data) { |
|
for (const p of this[PIPES]) { |
|
if (p.dest.write(data) === false) |
|
this.pause(); |
|
} |
|
const ret = this[DISCARDED] ? false : super.emit('data', data); |
|
this[MAYBE_EMIT_END](); |
|
return ret; |
|
} |
|
[EMITEND]() { |
|
if (this[EMITTED_END]) |
|
return false; |
|
this[EMITTED_END] = true; |
|
this.readable = false; |
|
return this[ASYNC] |
|
? (defer(() => this[EMITEND2]()), true) |
|
: this[EMITEND2](); |
|
} |
|
[EMITEND2]() { |
|
if (this[DECODER]) { |
|
const data = this[DECODER].end(); |
|
if (data) { |
|
for (const p of this[PIPES]) { |
|
p.dest.write(data); |
|
} |
|
if (!this[DISCARDED]) |
|
super.emit('data', data); |
|
} |
|
} |
|
for (const p of this[PIPES]) { |
|
p.end(); |
|
} |
|
const ret = super.emit('end'); |
|
this.removeAllListeners('end'); |
|
return ret; |
|
} |
|
/** |
|
* Return a Promise that resolves to an array of all emitted data once |
|
* the stream ends. |
|
*/ |
|
async collect() { |
|
const buf = Object.assign([], { |
|
dataLength: 0, |
|
}); |
|
if (!this[OBJECTMODE]) |
|
buf.dataLength = 0; |
|
// set the promise first, in case an error is raised |
|
// by triggering the flow here. |
|
const p = this.promise(); |
|
this.on('data', c => { |
|
buf.push(c); |
|
if (!this[OBJECTMODE]) |
|
buf.dataLength += c.length; |
|
}); |
|
await p; |
|
return buf; |
|
} |
|
/** |
|
* Return a Promise that resolves to the concatenation of all emitted data |
|
* once the stream ends. |
|
* |
|
* Not allowed on objectMode streams. |
|
*/ |
|
async concat() { |
|
if (this[OBJECTMODE]) { |
|
throw new Error('cannot concat in objectMode'); |
|
} |
|
const buf = await this.collect(); |
|
return (this[ENCODING] |
|
? buf.join('') |
|
: Buffer.concat(buf, buf.dataLength)); |
|
} |
|
/** |
|
* Return a void Promise that resolves once the stream ends. |
|
*/ |
|
async promise() { |
|
return new Promise((resolve, reject) => { |
|
this.on(DESTROYED, () => reject(new Error('stream destroyed'))); |
|
this.on('error', er => reject(er)); |
|
this.on('end', () => resolve()); |
|
}); |
|
} |
|
/** |
|
* Asynchronous `for await of` iteration. |
|
* |
|
* This will continue emitting all chunks until the stream terminates. |
|
*/ |
|
[Symbol.asyncIterator]() { |
|
// set this up front, in case the consumer doesn't call next() |
|
// right away. |
|
this[DISCARDED] = false; |
|
let stopped = false; |
|
const stop = async () => { |
|
this.pause(); |
|
stopped = true; |
|
return { value: undefined, done: true }; |
|
}; |
|
const next = () => { |
|
if (stopped) |
|
return stop(); |
|
const res = this.read(); |
|
if (res !== null) |
|
return Promise.resolve({ done: false, value: res }); |
|
if (this[EOF]) |
|
return stop(); |
|
let resolve; |
|
let reject; |
|
const onerr = (er) => { |
|
this.off('data', ondata); |
|
this.off('end', onend); |
|
this.off(DESTROYED, ondestroy); |
|
stop(); |
|
reject(er); |
|
}; |
|
const ondata = (value) => { |
|
this.off('error', onerr); |
|
this.off('end', onend); |
|
this.off(DESTROYED, ondestroy); |
|
this.pause(); |
|
resolve({ value, done: !!this[EOF] }); |
|
}; |
|
const onend = () => { |
|
this.off('error', onerr); |
|
this.off('data', ondata); |
|
this.off(DESTROYED, ondestroy); |
|
stop(); |
|
resolve({ done: true, value: undefined }); |
|
}; |
|
const ondestroy = () => onerr(new Error('stream destroyed')); |
|
return new Promise((res, rej) => { |
|
reject = rej; |
|
resolve = res; |
|
this.once(DESTROYED, ondestroy); |
|
this.once('error', onerr); |
|
this.once('end', onend); |
|
this.once('data', ondata); |
|
}); |
|
}; |
|
return { |
|
next, |
|
throw: stop, |
|
return: stop, |
|
[Symbol.asyncIterator]() { |
|
return this; |
|
}, |
|
}; |
|
} |
|
/** |
|
* Synchronous `for of` iteration. |
|
* |
|
* The iteration will terminate when the internal buffer runs out, even |
|
* if the stream has not yet terminated. |
|
*/ |
|
[Symbol.iterator]() { |
|
// set this up front, in case the consumer doesn't call next() |
|
// right away. |
|
this[DISCARDED] = false; |
|
let stopped = false; |
|
const stop = () => { |
|
this.pause(); |
|
this.off(ERROR, stop); |
|
this.off(DESTROYED, stop); |
|
this.off('end', stop); |
|
stopped = true; |
|
return { done: true, value: undefined }; |
|
}; |
|
const next = () => { |
|
if (stopped) |
|
return stop(); |
|
const value = this.read(); |
|
return value === null ? stop() : { done: false, value }; |
|
}; |
|
this.once('end', stop); |
|
this.once(ERROR, stop); |
|
this.once(DESTROYED, stop); |
|
return { |
|
next, |
|
throw: stop, |
|
return: stop, |
|
[Symbol.iterator]() { |
|
return this; |
|
}, |
|
}; |
|
} |
|
/** |
|
* Destroy a stream, preventing it from being used for any further purpose. |
|
* |
|
* If the stream has a `close()` method, then it will be called on |
|
* destruction. |
|
* |
|
* After destruction, any attempt to write data, read data, or emit most |
|
* events will be ignored. |
|
* |
|
* If an error argument is provided, then it will be emitted in an |
|
* 'error' event. |
|
*/ |
|
destroy(er) { |
|
if (this[DESTROYED]) { |
|
if (er) |
|
this.emit('error', er); |
|
else |
|
this.emit(DESTROYED); |
|
return this; |
|
} |
|
this[DESTROYED] = true; |
|
this[DISCARDED] = true; |
|
// throw away all buffered data, it's never coming out |
|
this[BUFFER].length = 0; |
|
this[BUFFERLENGTH] = 0; |
|
const wc = this; |
|
if (typeof wc.close === 'function' && !this[CLOSED]) |
|
wc.close(); |
|
if (er) |
|
this.emit('error', er); |
|
// if no error to emit, still reject pending promises |
|
else |
|
this.emit(DESTROYED); |
|
return this; |
|
} |
|
/** |
|
* Alias for {@link isStream} |
|
* |
|
* Former export location, maintained for backwards compatibility. |
|
* |
|
* @deprecated |
|
*/ |
|
static get isStream() { |
|
return isStream; |
|
} |
|
} |
|
//# sourceMappingURL=index.js.map
|