2023-07-24 11:13:08 +08:00
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict' ;
module . exports = Readable ;
/*<replacement>*/
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var Duplex ;
/*</replacement>*/
Readable . ReadableState = ReadableState ;
/*<replacement>*/
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var EE = require ( 'events' ) . EventEmitter ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var EElistenerCount = function EElistenerCount ( emitter , type ) {
return emitter . listeners ( type ) . length ;
} ;
/*</replacement>*/
/*<replacement>*/
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var Stream = require ( './internal/streams/stream' ) ;
/*</replacement>*/
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var Buffer = require ( 'buffer' ) . Buffer ;
2023-07-27 15:04:13 +08:00
var OurUint8Array = global . Uint8Array || function ( ) { } ;
2023-07-24 11:13:08 +08:00
function _uint8ArrayToBuffer ( chunk ) {
return Buffer . from ( chunk ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function _isUint8Array ( obj ) {
return Buffer . isBuffer ( obj ) || obj instanceof OurUint8Array ;
}
/*<replacement>*/
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var debugUtil = require ( 'util' ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var debug ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( debugUtil && debugUtil . debuglog ) {
debug = debugUtil . debuglog ( 'stream' ) ;
} else {
debug = function debug ( ) { } ;
}
/*</replacement>*/
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var BufferList = require ( './internal/streams/buffer_list' ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var destroyImpl = require ( './internal/streams/destroy' ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var _require = require ( './internal/streams/state' ) ,
2023-07-27 15:04:13 +08:00
getHighWaterMark = _require . getHighWaterMark ;
2023-07-24 11:13:08 +08:00
var _require$codes = require ( '../errors' ) . codes ,
2023-07-27 15:04:13 +08:00
ERR _INVALID _ARG _TYPE = _require$codes . ERR _INVALID _ARG _TYPE ,
ERR _STREAM _PUSH _AFTER _EOF = _require$codes . ERR _STREAM _PUSH _AFTER _EOF ,
ERR _METHOD _NOT _IMPLEMENTED = _require$codes . ERR _METHOD _NOT _IMPLEMENTED ,
ERR _STREAM _UNSHIFT _AFTER _END _EVENT = _require$codes . ERR _STREAM _UNSHIFT _AFTER _END _EVENT ; // Lazy loaded to improve the startup performance.
2023-07-24 11:13:08 +08:00
var StringDecoder ;
var createReadableStreamAsyncIterator ;
var from ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
require ( 'inherits' ) ( Readable , Stream ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var errorOrDestroy = destroyImpl . errorOrDestroy ;
var kProxyEvents = [ 'error' , 'close' , 'destroy' , 'pause' , 'resume' ] ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function prependListener ( emitter , event , fn ) {
// Sadly this is not cacheable as some libraries bundle their own
// event emitter implementation with them.
2023-07-27 15:04:13 +08:00
if ( typeof emitter . prependListener === 'function' ) return emitter . prependListener ( event , fn ) ; // This is a hack to make sure that our error handler is attached before any
2023-07-24 11:13:08 +08:00
// userland ones. NEVER DO THIS. This is here only because this code needs
// to continue to work with older versions of Node.js that do not include
// the prependListener() method. The goal is to eventually remove this hack.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! emitter . _events || ! emitter . _events [ event ] ) emitter . on ( event , fn ) ; else if ( Array . isArray ( emitter . _events [ event ] ) ) emitter . _events [ event ] . unshift ( fn ) ; else emitter . _events [ event ] = [ fn , emitter . _events [ event ] ] ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function ReadableState ( options , stream , isDuplex ) {
Duplex = Duplex || require ( './_stream_duplex' ) ;
2023-07-27 15:04:13 +08:00
options = options || { } ; // Duplex streams are both readable and writable, but share
2023-07-24 11:13:08 +08:00
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
2023-07-27 15:04:13 +08:00
if ( typeof isDuplex !== 'boolean' ) isDuplex = stream instanceof Duplex ; // object stream flag. Used to make read(n) ignore n and to
2023-07-24 11:13:08 +08:00
// make all the buffer merging and length checks go away
2023-07-27 15:04:13 +08:00
this . objectMode = ! ! options . objectMode ;
if ( isDuplex ) this . objectMode = this . objectMode || ! ! options . readableObjectMode ; // the point at which it stops calling _read() to fill the buffer
2023-07-24 11:13:08 +08:00
// Note: 0 is a valid value, means "don't call _read preemptively ever"
2023-07-27 15:04:13 +08:00
this . highWaterMark = getHighWaterMark ( this , options , 'readableHighWaterMark' , isDuplex ) ; // A linked list is used to store data chunks instead of an array because the
2023-07-24 11:13:08 +08:00
// linked list can remove elements from the beginning faster than
// array.shift()
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
this . buffer = new BufferList ( ) ;
this . length = 0 ;
this . pipes = null ;
this . pipesCount = 0 ;
this . flowing = null ;
this . ended = false ;
this . endEmitted = false ;
2023-07-27 15:04:13 +08:00
this . reading = false ; // a flag to be able to tell if the event 'readable'/'data' is emitted
2023-07-24 11:13:08 +08:00
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
2023-07-27 15:04:13 +08:00
this . sync = true ; // whenever we return null, then we set a flag to say
2023-07-24 11:13:08 +08:00
// that we're awaiting a 'readable' event emission.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
this . needReadable = false ;
this . emittedReadable = false ;
this . readableListening = false ;
this . resumeScheduled = false ;
2023-07-27 15:04:13 +08:00
this . paused = true ; // Should close be emitted on destroy. Defaults to true.
2023-07-24 11:13:08 +08:00
2023-07-27 15:04:13 +08:00
this . emitClose = options . emitClose !== false ; // Should .destroy() be called after 'end' (and potentially 'finish')
2023-07-24 11:13:08 +08:00
2023-07-27 15:04:13 +08:00
this . autoDestroy = ! ! options . autoDestroy ; // has it been destroyed
2023-07-24 11:13:08 +08:00
2023-07-27 15:04:13 +08:00
this . destroyed = false ; // Crypto is kind of old and crusty. Historically, its default string
2023-07-24 11:13:08 +08:00
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
2023-07-27 15:04:13 +08:00
this . defaultEncoding = options . defaultEncoding || 'utf8' ; // the number of writers that are awaiting a drain event in .pipe()s
this . awaitDrain = 0 ; // if true, a maybeReadMore has been scheduled
2023-07-24 11:13:08 +08:00
this . readingMore = false ;
this . decoder = null ;
this . encoding = null ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( options . encoding ) {
if ( ! StringDecoder ) StringDecoder = require ( 'string_decoder/' ) . StringDecoder ;
this . decoder = new StringDecoder ( options . encoding ) ;
this . encoding = options . encoding ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function Readable ( options ) {
Duplex = Duplex || require ( './_stream_duplex' ) ;
2023-07-27 15:04:13 +08:00
if ( ! ( this instanceof Readable ) ) return new Readable ( options ) ; // Checking for a Stream.Duplex instance is faster here instead of inside
2023-07-24 11:13:08 +08:00
// the ReadableState constructor, at least with V8 6.5
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var isDuplex = this instanceof Duplex ;
2023-07-27 15:04:13 +08:00
this . _readableState = new ReadableState ( options , this , isDuplex ) ; // legacy
2023-07-24 11:13:08 +08:00
this . readable = true ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( options ) {
if ( typeof options . read === 'function' ) this . _read = options . read ;
if ( typeof options . destroy === 'function' ) this . _destroy = options . destroy ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Stream . call ( this ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Object . defineProperty ( Readable . prototype , 'destroyed' , {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable : false ,
get : function get ( ) {
if ( this . _readableState === undefined ) {
return false ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return this . _readableState . destroyed ;
} ,
set : function set ( value ) {
// we ignore the value if the stream
// has not been initialized yet
if ( ! this . _readableState ) {
return ;
2023-07-27 15:04:13 +08:00
} // backward compatibility, the user is explicitly
2023-07-24 11:13:08 +08:00
// managing destroyed
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
this . _readableState . destroyed = value ;
}
} ) ;
Readable . prototype . destroy = destroyImpl . destroy ;
Readable . prototype . _undestroy = destroyImpl . undestroy ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . _destroy = function ( err , cb ) {
cb ( err ) ;
2023-07-27 15:04:13 +08:00
} ; // Manually shove something into the read() buffer.
2023-07-24 11:13:08 +08:00
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . push = function ( chunk , encoding ) {
var state = this . _readableState ;
var skipChunkCheck ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . objectMode ) {
if ( typeof chunk === 'string' ) {
encoding = encoding || state . defaultEncoding ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( encoding !== state . encoding ) {
chunk = Buffer . from ( chunk , encoding ) ;
encoding = '' ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
skipChunkCheck = true ;
}
} else {
skipChunkCheck = true ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return readableAddChunk ( this , chunk , encoding , false , skipChunkCheck ) ;
2023-07-27 15:04:13 +08:00
} ; // Unshift should *always* be something directly out of read()
2023-07-24 11:13:08 +08:00
Readable . prototype . unshift = function ( chunk ) {
return readableAddChunk ( this , chunk , null , true , false ) ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function readableAddChunk ( stream , chunk , encoding , addToFront , skipChunkCheck ) {
debug ( 'readableAddChunk' , chunk ) ;
var state = stream . _readableState ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( chunk === null ) {
state . reading = false ;
onEofChunk ( stream , state ) ;
} else {
var er ;
if ( ! skipChunkCheck ) er = chunkInvalid ( state , chunk ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( er ) {
errorOrDestroy ( stream , er ) ;
} else if ( state . objectMode || chunk && chunk . length > 0 ) {
if ( typeof chunk !== 'string' && ! state . objectMode && Object . getPrototypeOf ( chunk ) !== Buffer . prototype ) {
chunk = _uint8ArrayToBuffer ( chunk ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( addToFront ) {
if ( state . endEmitted ) errorOrDestroy ( stream , new ERR _STREAM _UNSHIFT _AFTER _END _EVENT ( ) ) ; else addChunk ( stream , state , chunk , true ) ;
} else if ( state . ended ) {
errorOrDestroy ( stream , new ERR _STREAM _PUSH _AFTER _EOF ( ) ) ;
} else if ( state . destroyed ) {
return false ;
} else {
state . reading = false ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . decoder && ! encoding ) {
chunk = state . decoder . write ( chunk ) ;
if ( state . objectMode || chunk . length !== 0 ) addChunk ( stream , state , chunk , false ) ; else maybeReadMore ( stream , state ) ;
} else {
addChunk ( stream , state , chunk , false ) ;
}
}
} else if ( ! addToFront ) {
state . reading = false ;
maybeReadMore ( stream , state ) ;
}
2023-07-27 15:04:13 +08:00
} // We can push more data if we are below the highWaterMark.
2023-07-24 11:13:08 +08:00
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return ! state . ended && ( state . length < state . highWaterMark || state . length === 0 ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function addChunk ( stream , state , chunk , addToFront ) {
if ( state . flowing && state . length === 0 && ! state . sync ) {
state . awaitDrain = 0 ;
stream . emit ( 'data' , chunk ) ;
} else {
// update the buffer info.
state . length += state . objectMode ? 1 : chunk . length ;
if ( addToFront ) state . buffer . unshift ( chunk ) ; else state . buffer . push ( chunk ) ;
if ( state . needReadable ) emitReadable ( stream ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
maybeReadMore ( stream , state ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function chunkInvalid ( state , chunk ) {
var er ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! _isUint8Array ( chunk ) && typeof chunk !== 'string' && chunk !== undefined && ! state . objectMode ) {
er = new ERR _INVALID _ARG _TYPE ( 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return er ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . isPaused = function ( ) {
return this . _readableState . flowing === false ;
2023-07-27 15:04:13 +08:00
} ; // backwards compatibility.
2023-07-24 11:13:08 +08:00
Readable . prototype . setEncoding = function ( enc ) {
if ( ! StringDecoder ) StringDecoder = require ( 'string_decoder/' ) . StringDecoder ;
var decoder = new StringDecoder ( enc ) ;
2023-07-27 15:04:13 +08:00
this . _readableState . decoder = decoder ; // If setEncoding(null), decoder.encoding equals utf8
this . _readableState . encoding = this . _readableState . decoder . encoding ; // Iterate over current buffer to convert already stored Buffers:
2023-07-24 11:13:08 +08:00
var p = this . _readableState . buffer . head ;
var content = '' ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
while ( p !== null ) {
content += decoder . write ( p . data ) ;
p = p . next ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
this . _readableState . buffer . clear ( ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( content !== '' ) this . _readableState . buffer . push ( content ) ;
this . _readableState . length = content . length ;
return this ;
2023-07-27 15:04:13 +08:00
} ; // Don't raise the hwm > 1GB
2023-07-24 11:13:08 +08:00
var MAX _HWM = 0x40000000 ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function computeNewHighWaterMark ( n ) {
if ( n >= MAX _HWM ) {
// TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
n = MAX _HWM ;
} else {
// Get the next highest power of 2 to prevent increasing hwm excessively in
// tiny amounts
n -- ;
n |= n >>> 1 ;
n |= n >>> 2 ;
n |= n >>> 4 ;
n |= n >>> 8 ;
n |= n >>> 16 ;
n ++ ;
}
2023-07-27 15:04:13 +08:00
return n ;
} // This function is designed to be inlinable, so please take care when making
2023-07-24 11:13:08 +08:00
// changes to the function body.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function howMuchToRead ( n , state ) {
if ( n <= 0 || state . length === 0 && state . ended ) return 0 ;
if ( state . objectMode ) return 1 ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( n !== n ) {
// Only flow one buffer at a time
if ( state . flowing && state . length ) return state . buffer . head . data . length ; else return state . length ;
2023-07-27 15:04:13 +08:00
} // If we're asking for more than the current hwm, then raise the hwm.
2023-07-24 11:13:08 +08:00
if ( n > state . highWaterMark ) state . highWaterMark = computeNewHighWaterMark ( n ) ;
2023-07-27 15:04:13 +08:00
if ( n <= state . length ) return n ; // Don't have enough
2023-07-24 11:13:08 +08:00
if ( ! state . ended ) {
state . needReadable = true ;
return 0 ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return state . length ;
2023-07-27 15:04:13 +08:00
} // you can override either this method, or the async _read(n) below.
2023-07-24 11:13:08 +08:00
Readable . prototype . read = function ( n ) {
debug ( 'read' , n ) ;
n = parseInt ( n , 10 ) ;
var state = this . _readableState ;
var nOrig = n ;
2023-07-27 15:04:13 +08:00
if ( n !== 0 ) state . emittedReadable = false ; // if we're doing read(0) to trigger a readable event, but we
2023-07-24 11:13:08 +08:00
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( n === 0 && state . needReadable && ( ( state . highWaterMark !== 0 ? state . length >= state . highWaterMark : state . length > 0 ) || state . ended ) ) {
debug ( 'read: emitReadable' , state . length , state . ended ) ;
if ( state . length === 0 && state . ended ) endReadable ( this ) ; else emitReadable ( this ) ;
return null ;
}
2023-07-27 15:04:13 +08:00
n = howMuchToRead ( n , state ) ; // if we've ended, and we're now clear, then finish it up.
2023-07-24 11:13:08 +08:00
if ( n === 0 && state . ended ) {
if ( state . length === 0 ) endReadable ( this ) ;
return null ;
2023-07-27 15:04:13 +08:00
} // All the actual chunk generation logic needs to be
2023-07-24 11:13:08 +08:00
// *below* the call to _read. The reason is that in certain
// synthetic stream cases, such as passthrough streams, _read
// may be a completely synchronous operation which may change
// the state of the read buffer, providing enough data when
// before there was *not* enough.
//
// So, the steps are:
// 1. Figure out what the state of things will be after we do
// a read from the buffer.
//
// 2. If that resulting state will trigger a _read, then call _read.
// Note that this may be asynchronous, or synchronous. Yes, it is
// deeply ugly to write APIs this way, but that still doesn't mean
// that the Readable class should behave improperly, as streams are
// designed to be sync/async agnostic.
// Take note if the _read call is sync or async (ie, if the read call
// has returned yet), so that we know whether or not it's safe to emit
// 'readable' etc.
//
// 3. Actually pull the requested chunks out of the buffer and return.
// if we need a readable event, then we need to do some reading.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var doRead = state . needReadable ;
2023-07-27 15:04:13 +08:00
debug ( 'need readable' , doRead ) ; // if we currently have less than the highWaterMark, then also read some
2023-07-24 11:13:08 +08:00
if ( state . length === 0 || state . length - n < state . highWaterMark ) {
doRead = true ;
debug ( 'length less than watermark' , doRead ) ;
2023-07-27 15:04:13 +08:00
} // however, if we've ended, then there's no point, and if we're already
2023-07-24 11:13:08 +08:00
// reading, then it's unnecessary.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . ended || state . reading ) {
doRead = false ;
debug ( 'reading or ended' , doRead ) ;
} else if ( doRead ) {
debug ( 'do read' ) ;
state . reading = true ;
2023-07-27 15:04:13 +08:00
state . sync = true ; // if the length is currently zero, then we *need* a readable event.
if ( state . length === 0 ) state . needReadable = true ; // call internal read method
2023-07-24 11:13:08 +08:00
this . _read ( state . highWaterMark ) ;
2023-07-27 15:04:13 +08:00
state . sync = false ; // If _read pushed data synchronously, then `reading` will be false,
2023-07-24 11:13:08 +08:00
// and we need to re-evaluate how much data we can return to the user.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . reading ) n = howMuchToRead ( nOrig , state ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var ret ;
if ( n > 0 ) ret = fromList ( n , state ) ; else ret = null ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ret === null ) {
state . needReadable = state . length <= state . highWaterMark ;
n = 0 ;
} else {
state . length -= n ;
state . awaitDrain = 0 ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . length === 0 ) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
2023-07-27 15:04:13 +08:00
if ( ! state . ended ) state . needReadable = true ; // If we tried to read() past the EOF, then emit end on the next tick.
2023-07-24 11:13:08 +08:00
if ( nOrig !== n && state . ended ) endReadable ( this ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ret !== null ) this . emit ( 'data' , ret ) ;
return ret ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function onEofChunk ( stream , state ) {
debug ( 'onEofChunk' ) ;
if ( state . ended ) return ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . decoder ) {
var chunk = state . decoder . end ( ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( chunk && chunk . length ) {
state . buffer . push ( chunk ) ;
state . length += state . objectMode ? 1 : chunk . length ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
state . ended = true ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . sync ) {
// if we are sync, wait until next tick to emit the data.
// Otherwise we risk emitting data in the flow()
// the readable code triggers during a read() call
emitReadable ( stream ) ;
} else {
// emit 'readable' now to make sure it gets picked up.
state . needReadable = false ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . emittedReadable ) {
state . emittedReadable = true ;
emitReadable _ ( stream ) ;
}
}
2023-07-27 15:04:13 +08:00
} // Don't emit readable right away in sync mode, because this can trigger
2023-07-24 11:13:08 +08:00
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function emitReadable ( stream ) {
var state = stream . _readableState ;
debug ( 'emitReadable' , state . needReadable , state . emittedReadable ) ;
state . needReadable = false ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . emittedReadable ) {
debug ( 'emitReadable' , state . flowing ) ;
state . emittedReadable = true ;
process . nextTick ( emitReadable _ , stream ) ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function emitReadable _ ( stream ) {
var state = stream . _readableState ;
debug ( 'emitReadable_' , state . destroyed , state . length , state . ended ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . destroyed && ( state . length || state . ended ) ) {
stream . emit ( 'readable' ) ;
state . emittedReadable = false ;
2023-07-27 15:04:13 +08:00
} // The stream needs another readable event if
2023-07-24 11:13:08 +08:00
// 1. It is not flowing, as the flow mechanism will take
// care of it.
// 2. It is not ended.
// 3. It is below the highWaterMark, so we can schedule
// another readable later.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
state . needReadable = ! state . flowing && ! state . ended && state . length <= state . highWaterMark ;
flow ( stream ) ;
2023-07-27 15:04:13 +08:00
} // at this point, the user has presumably seen the 'readable' event,
2023-07-24 11:13:08 +08:00
// and called read() to consume some data. that may have triggered
// in turn another _read(n) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function maybeReadMore ( stream , state ) {
if ( ! state . readingMore ) {
state . readingMore = true ;
process . nextTick ( maybeReadMore _ , stream , state ) ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function maybeReadMore _ ( stream , state ) {
// Attempt to read more data if we should.
//
// The conditions for reading more data are (one of):
// - Not enough data buffered (state.length < state.highWaterMark). The loop
// is responsible for filling the buffer with enough data if such data
// is available. If highWaterMark is 0 and we are not in the flowing mode
// we should _not_ attempt to buffer any extra data. We'll get more data
// when the stream consumer calls read() instead.
// - No data in the buffer, and the stream is in flowing mode. In this mode
// the loop below is responsible for ensuring read() is called. Failing to
// call read here would abort the flow and there's no other mechanism for
// continuing the flow if the stream consumer has just subscribed to the
// 'data' event.
//
// In addition to the above conditions to keep reading data, the following
// conditions prevent the data from being read:
// - The stream has ended (state.ended).
// - There is already a pending 'read' operation (state.reading). This is a
// case where the the stream has called the implementation defined _read()
// method, but they are processing the call asynchronously and have _not_
// called push() with new data. In this case we skip performing more
// read()s. The execution ends in this method again after the _read() ends
// up calling push() with more data.
while ( ! state . reading && ! state . ended && ( state . length < state . highWaterMark || state . flowing && state . length === 0 ) ) {
var len = state . length ;
debug ( 'maybeReadMore read 0' ) ;
stream . read ( 0 ) ;
2023-07-27 15:04:13 +08:00
if ( len === state . length ) // didn't get any data, stop spinning.
2023-07-24 11:13:08 +08:00
break ;
}
2023-07-27 15:04:13 +08:00
state . readingMore = false ;
} // abstract method. to be overridden in specific implementation classes.
2023-07-24 11:13:08 +08:00
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . _read = function ( n ) {
errorOrDestroy ( this , new ERR _METHOD _NOT _IMPLEMENTED ( '_read()' ) ) ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . pipe = function ( dest , pipeOpts ) {
var src = this ;
var state = this . _readableState ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
switch ( state . pipesCount ) {
case 0 :
state . pipes = dest ;
break ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
case 1 :
state . pipes = [ state . pipes , dest ] ;
break ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
default :
state . pipes . push ( dest ) ;
break ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
state . pipesCount += 1 ;
debug ( 'pipe count=%d opts=%j' , state . pipesCount , pipeOpts ) ;
var doEnd = ( ! pipeOpts || pipeOpts . end !== false ) && dest !== process . stdout && dest !== process . stderr ;
var endFn = doEnd ? onend : unpipe ;
if ( state . endEmitted ) process . nextTick ( endFn ) ; else src . once ( 'end' , endFn ) ;
dest . on ( 'unpipe' , onunpipe ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function onunpipe ( readable , unpipeInfo ) {
debug ( 'onunpipe' ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( readable === src ) {
if ( unpipeInfo && unpipeInfo . hasUnpiped === false ) {
unpipeInfo . hasUnpiped = true ;
cleanup ( ) ;
}
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function onend ( ) {
debug ( 'onend' ) ;
dest . end ( ) ;
2023-07-27 15:04:13 +08:00
} // when the dest drains, it reduces the awaitDrain counter
2023-07-24 11:13:08 +08:00
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var ondrain = pipeOnDrain ( src ) ;
dest . on ( 'drain' , ondrain ) ;
var cleanedUp = false ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function cleanup ( ) {
2023-07-27 15:04:13 +08:00
debug ( 'cleanup' ) ; // cleanup event handlers once the pipe is broken
2023-07-24 11:13:08 +08:00
dest . removeListener ( 'close' , onclose ) ;
dest . removeListener ( 'finish' , onfinish ) ;
dest . removeListener ( 'drain' , ondrain ) ;
dest . removeListener ( 'error' , onerror ) ;
dest . removeListener ( 'unpipe' , onunpipe ) ;
src . removeListener ( 'end' , onend ) ;
src . removeListener ( 'end' , unpipe ) ;
src . removeListener ( 'data' , ondata ) ;
2023-07-27 15:04:13 +08:00
cleanedUp = true ; // if the reader is waiting for a drain event from this
2023-07-24 11:13:08 +08:00
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . awaitDrain && ( ! dest . _writableState || dest . _writableState . needDrain ) ) ondrain ( ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
src . on ( 'data' , ondata ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function ondata ( chunk ) {
debug ( 'ondata' ) ;
var ret = dest . write ( chunk ) ;
debug ( 'dest.write' , ret ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ret === false ) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if ( ( state . pipesCount === 1 && state . pipes === dest || state . pipesCount > 1 && indexOf ( state . pipes , dest ) !== - 1 ) && ! cleanedUp ) {
debug ( 'false write response, pause' , state . awaitDrain ) ;
state . awaitDrain ++ ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
src . pause ( ) ;
}
2023-07-27 15:04:13 +08:00
} // if the dest has an error, then stop piping into it.
2023-07-24 11:13:08 +08:00
// however, don't suppress the throwing behavior for this.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function onerror ( er ) {
debug ( 'onerror' , er ) ;
unpipe ( ) ;
dest . removeListener ( 'error' , onerror ) ;
if ( EElistenerCount ( dest , 'error' ) === 0 ) errorOrDestroy ( dest , er ) ;
2023-07-27 15:04:13 +08:00
} // Make sure our error handler is attached before userland ones.
2023-07-24 11:13:08 +08:00
2023-07-27 15:04:13 +08:00
prependListener ( dest , 'error' , onerror ) ; // Both close and finish should trigger unpipe, but only once.
2023-07-24 11:13:08 +08:00
function onclose ( ) {
dest . removeListener ( 'finish' , onfinish ) ;
unpipe ( ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
dest . once ( 'close' , onclose ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function onfinish ( ) {
debug ( 'onfinish' ) ;
dest . removeListener ( 'close' , onclose ) ;
unpipe ( ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
dest . once ( 'finish' , onfinish ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function unpipe ( ) {
debug ( 'unpipe' ) ;
src . unpipe ( dest ) ;
2023-07-27 15:04:13 +08:00
} // tell the dest that it's being piped to
2023-07-24 11:13:08 +08:00
2023-07-27 15:04:13 +08:00
dest . emit ( 'pipe' , src ) ; // start the flow if it hasn't been started already.
2023-07-24 11:13:08 +08:00
if ( ! state . flowing ) {
debug ( 'pipe resume' ) ;
src . resume ( ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return dest ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function pipeOnDrain ( src ) {
return function pipeOnDrainFunctionResult ( ) {
var state = src . _readableState ;
debug ( 'pipeOnDrain' , state . awaitDrain ) ;
if ( state . awaitDrain ) state . awaitDrain -- ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . awaitDrain === 0 && EElistenerCount ( src , 'data' ) ) {
state . flowing = true ;
flow ( src ) ;
}
} ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . unpipe = function ( dest ) {
var state = this . _readableState ;
var unpipeInfo = {
hasUnpiped : false
2023-07-27 15:04:13 +08:00
} ; // if we're not piping anywhere, then do nothing.
2023-07-24 11:13:08 +08:00
2023-07-27 15:04:13 +08:00
if ( state . pipesCount === 0 ) return this ; // just one destination. most common case.
2023-07-24 11:13:08 +08:00
if ( state . pipesCount === 1 ) {
// passed in one, but it's not the right one.
if ( dest && dest !== state . pipes ) return this ;
2023-07-27 15:04:13 +08:00
if ( ! dest ) dest = state . pipes ; // got a match.
2023-07-24 11:13:08 +08:00
state . pipes = null ;
state . pipesCount = 0 ;
state . flowing = false ;
if ( dest ) dest . emit ( 'unpipe' , this , unpipeInfo ) ;
return this ;
2023-07-27 15:04:13 +08:00
} // slow case. multiple pipe destinations.
2023-07-24 11:13:08 +08:00
if ( ! dest ) {
// remove all.
var dests = state . pipes ;
var len = state . pipesCount ;
state . pipes = null ;
state . pipesCount = 0 ;
state . flowing = false ;
2023-07-27 15:04:13 +08:00
for ( var i = 0 ; i < len ; i ++ ) {
dests [ i ] . emit ( 'unpipe' , this , {
hasUnpiped : false
} ) ;
}
2023-07-24 11:13:08 +08:00
return this ;
2023-07-27 15:04:13 +08:00
} // try to find the right one.
2023-07-24 11:13:08 +08:00
var index = indexOf ( state . pipes , dest ) ;
if ( index === - 1 ) return this ;
state . pipes . splice ( index , 1 ) ;
state . pipesCount -= 1 ;
if ( state . pipesCount === 1 ) state . pipes = state . pipes [ 0 ] ;
dest . emit ( 'unpipe' , this , unpipeInfo ) ;
return this ;
2023-07-27 15:04:13 +08:00
} ; // set up data events if they are asked for
2023-07-24 11:13:08 +08:00
// Ensure readable listeners eventually get something
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . on = function ( ev , fn ) {
var res = Stream . prototype . on . call ( this , ev , fn ) ;
var state = this . _readableState ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ev === 'data' ) {
// update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
2023-07-27 15:04:13 +08:00
state . readableListening = this . listenerCount ( 'readable' ) > 0 ; // Try start flowing on next tick if stream isn't explicitly paused
2023-07-24 11:13:08 +08:00
if ( state . flowing !== false ) this . resume ( ) ;
} else if ( ev === 'readable' ) {
if ( ! state . endEmitted && ! state . readableListening ) {
state . readableListening = state . needReadable = true ;
state . flowing = false ;
state . emittedReadable = false ;
debug ( 'on readable' , state . length , state . reading ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . length ) {
emitReadable ( this ) ;
} else if ( ! state . reading ) {
process . nextTick ( nReadingNextTick , this ) ;
}
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return res ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . addListener = Readable . prototype . on ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . removeListener = function ( ev , fn ) {
var res = Stream . prototype . removeListener . call ( this , ev , fn ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ev === 'readable' ) {
// We need to check if there is someone still listening to
// readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process . nextTick ( updateReadableListening , this ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return res ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . removeAllListeners = function ( ev ) {
var res = Stream . prototype . removeAllListeners . apply ( this , arguments ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ev === 'readable' || ev === undefined ) {
// We need to check if there is someone still listening to
// readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process . nextTick ( updateReadableListening , this ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return res ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function updateReadableListening ( self ) {
var state = self . _readableState ;
state . readableListening = self . listenerCount ( 'readable' ) > 0 ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . resumeScheduled && ! state . paused ) {
// flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
2023-07-27 15:04:13 +08:00
state . flowing = true ; // crude way to check if we should resume
2023-07-24 11:13:08 +08:00
} else if ( self . listenerCount ( 'data' ) > 0 ) {
self . resume ( ) ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function nReadingNextTick ( self ) {
debug ( 'readable nexttick read 0' ) ;
self . read ( 0 ) ;
2023-07-27 15:04:13 +08:00
} // pause() and resume() are remnants of the legacy readable stream API
2023-07-24 11:13:08 +08:00
// If the user uses them, then switch into old mode.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . resume = function ( ) {
var state = this . _readableState ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . flowing ) {
2023-07-27 15:04:13 +08:00
debug ( 'resume' ) ; // we flow only if there is no one listening
2023-07-24 11:13:08 +08:00
// for readable, but we still have to call
// resume()
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
state . flowing = ! state . readableListening ;
resume ( this , state ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
state . paused = false ;
return this ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function resume ( stream , state ) {
if ( ! state . resumeScheduled ) {
state . resumeScheduled = true ;
process . nextTick ( resume _ , stream , state ) ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function resume _ ( stream , state ) {
debug ( 'resume' , state . reading ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . reading ) {
stream . read ( 0 ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
state . resumeScheduled = false ;
stream . emit ( 'resume' ) ;
flow ( stream ) ;
if ( state . flowing && ! state . reading ) stream . read ( 0 ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . pause = function ( ) {
debug ( 'call pause flowing=%j' , this . _readableState . flowing ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( this . _readableState . flowing !== false ) {
debug ( 'pause' ) ;
this . _readableState . flowing = false ;
this . emit ( 'pause' ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
this . _readableState . paused = true ;
return this ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function flow ( stream ) {
var state = stream . _readableState ;
debug ( 'flow' , state . flowing ) ;
2023-07-27 15:04:13 +08:00
while ( state . flowing && stream . read ( ) !== null ) {
;
}
} // wrap an old-style stream as the async data source.
2023-07-24 11:13:08 +08:00
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Readable . prototype . wrap = function ( stream ) {
var _this = this ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var state = this . _readableState ;
var paused = false ;
stream . on ( 'end' , function ( ) {
debug ( 'wrapped end' ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . decoder && ! state . ended ) {
var chunk = state . decoder . end ( ) ;
if ( chunk && chunk . length ) _this . push ( chunk ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
_this . push ( null ) ;
} ) ;
stream . on ( 'data' , function ( chunk ) {
debug ( 'wrapped data' ) ;
2023-07-27 15:04:13 +08:00
if ( state . decoder ) chunk = state . decoder . write ( chunk ) ; // don't skip over falsy values in objectMode
2023-07-24 11:13:08 +08:00
if ( state . objectMode && ( chunk === null || chunk === undefined ) ) return ; else if ( ! state . objectMode && ( ! chunk || ! chunk . length ) ) return ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
var ret = _this . push ( chunk ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! ret ) {
paused = true ;
stream . pause ( ) ;
}
2023-07-27 15:04:13 +08:00
} ) ; // proxy all the other methods.
2023-07-24 11:13:08 +08:00
// important when wrapping filters and duplexes.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
for ( var i in stream ) {
if ( this [ i ] === undefined && typeof stream [ i ] === 'function' ) {
this [ i ] = function methodWrap ( method ) {
return function methodWrapReturnFunction ( ) {
return stream [ method ] . apply ( stream , arguments ) ;
} ;
} ( i ) ;
}
2023-07-27 15:04:13 +08:00
} // proxy certain important events.
2023-07-24 11:13:08 +08:00
for ( var n = 0 ; n < kProxyEvents . length ; n ++ ) {
stream . on ( kProxyEvents [ n ] , this . emit . bind ( this , kProxyEvents [ n ] ) ) ;
2023-07-27 15:04:13 +08:00
} // when we try to consume some more bytes, simply unpause the
2023-07-24 11:13:08 +08:00
// underlying stream.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
this . _read = function ( n ) {
debug ( 'wrapped _read' , n ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( paused ) {
paused = false ;
stream . resume ( ) ;
}
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return this ;
} ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( typeof Symbol === 'function' ) {
Readable . prototype [ Symbol . asyncIterator ] = function ( ) {
if ( createReadableStreamAsyncIterator === undefined ) {
createReadableStreamAsyncIterator = require ( './internal/streams/async_iterator' ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return createReadableStreamAsyncIterator ( this ) ;
} ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
Object . defineProperty ( Readable . prototype , 'readableHighWaterMark' , {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable : false ,
get : function get ( ) {
return this . _readableState . highWaterMark ;
}
} ) ;
Object . defineProperty ( Readable . prototype , 'readableBuffer' , {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable : false ,
get : function get ( ) {
return this . _readableState && this . _readableState . buffer ;
}
} ) ;
Object . defineProperty ( Readable . prototype , 'readableFlowing' , {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable : false ,
get : function get ( ) {
return this . _readableState . flowing ;
} ,
set : function set ( state ) {
if ( this . _readableState ) {
this . _readableState . flowing = state ;
}
}
2023-07-27 15:04:13 +08:00
} ) ; // exposed for testing purposes only.
2023-07-24 11:13:08 +08:00
Readable . _fromList = fromList ;
Object . defineProperty ( Readable . prototype , 'readableLength' , {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable : false ,
get : function get ( ) {
return this . _readableState . length ;
}
2023-07-27 15:04:13 +08:00
} ) ; // Pluck off n bytes from an array of buffers.
2023-07-24 11:13:08 +08:00
// Length is the combined lengths of all the buffers in the list.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function fromList ( n , state ) {
// nothing buffered
if ( state . length === 0 ) return null ;
var ret ;
if ( state . objectMode ) ret = state . buffer . shift ( ) ; else if ( ! n || n >= state . length ) {
// read it all, truncate the list
if ( state . decoder ) ret = state . buffer . join ( '' ) ; else if ( state . buffer . length === 1 ) ret = state . buffer . first ( ) ; else ret = state . buffer . concat ( state . length ) ;
state . buffer . clear ( ) ;
} else {
// read part of list
ret = state . buffer . consume ( n , state . decoder ) ;
}
return ret ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function endReadable ( stream ) {
var state = stream . _readableState ;
debug ( 'endReadable' , state . endEmitted ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! state . endEmitted ) {
state . ended = true ;
process . nextTick ( endReadableNT , state , stream ) ;
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function endReadableNT ( state , stream ) {
2023-07-27 15:04:13 +08:00
debug ( 'endReadableNT' , state . endEmitted , state . length ) ; // Check that we didn't get one last unshift.
2023-07-24 11:13:08 +08:00
if ( ! state . endEmitted && state . length === 0 ) {
state . endEmitted = true ;
stream . readable = false ;
stream . emit ( 'end' ) ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( state . autoDestroy ) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
var wState = stream . _writableState ;
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( ! wState || wState . autoDestroy && wState . finished ) {
stream . destroy ( ) ;
}
}
}
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
if ( typeof Symbol === 'function' ) {
Readable . from = function ( iterable , opts ) {
if ( from === undefined ) {
from = require ( './internal/streams/from' ) ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return from ( Readable , iterable , opts ) ;
} ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
function indexOf ( xs , x ) {
for ( var i = 0 , l = xs . length ; i < l ; i ++ ) {
if ( xs [ i ] === x ) return i ;
}
2023-07-27 15:04:13 +08:00
2023-07-24 11:13:08 +08:00
return - 1 ;
}