You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
209 lines
6.0 KiB
209 lines
6.0 KiB
'use strict'; |
|
|
|
var STREAM = require('stream'), |
|
UTIL = require('util'), |
|
StringDecoder = require('string_decoder').StringDecoder; |
|
|
|
function MemoryReadableStream(data, options) { |
|
if (!(this instanceof MemoryReadableStream)) |
|
return new MemoryReadableStream(data, options); |
|
MemoryReadableStream.super_.call(this, options); |
|
this.init(data, options); |
|
} |
|
UTIL.inherits(MemoryReadableStream, STREAM.Readable); |
|
|
|
|
|
function MemoryWritableStream(data, options) { |
|
if (!(this instanceof MemoryWritableStream)) |
|
return new MemoryWritableStream(data, options); |
|
MemoryWritableStream.super_.call(this, options); |
|
this.init(data, options); |
|
} |
|
UTIL.inherits(MemoryWritableStream, STREAM.Writable); |
|
|
|
|
|
function MemoryDuplexStream(data, options) { |
|
if (!(this instanceof MemoryDuplexStream)) |
|
return new MemoryDuplexStream(data, options); |
|
MemoryDuplexStream.super_.call(this, options); |
|
this.init(data, options); |
|
} |
|
UTIL.inherits(MemoryDuplexStream, STREAM.Duplex); |
|
|
|
|
|
MemoryReadableStream.prototype.init = |
|
MemoryWritableStream.prototype.init = |
|
MemoryDuplexStream.prototype.init = function init (data, options) { |
|
var self = this; |
|
this.queue = []; |
|
|
|
if (data) { |
|
if (!Array.isArray(data)) { |
|
data = [ data ]; |
|
} |
|
|
|
data.forEach(function (chunk) { |
|
if (!(chunk instanceof Buffer)) { |
|
chunk = new Buffer(chunk); |
|
} |
|
self.queue.push(chunk); |
|
}); |
|
|
|
} |
|
|
|
options = options || {}; |
|
|
|
this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize |
|
: null; |
|
this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow |
|
: null; |
|
this.frequence = options.hasOwnProperty('frequence') ? options.frequence |
|
: null; |
|
}; |
|
|
|
function MemoryStream (data, options) { |
|
if (!(this instanceof MemoryStream)) |
|
return new MemoryStream(data, options); |
|
|
|
options = options || {}; |
|
|
|
var readable = options.hasOwnProperty('readable') ? options.readable : true, |
|
writable = options.hasOwnProperty('writable') ? options.writable : true; |
|
|
|
if (readable && writable) { |
|
return new MemoryDuplexStream(data, options); |
|
} else if (readable) { |
|
return new MemoryReadableStream(data, options); |
|
} else if (writable) { |
|
return new MemoryWritableStream(data, options); |
|
} else { |
|
throw new Error("Unknown stream type Readable, Writable or Duplex "); |
|
} |
|
} |
|
|
|
|
|
MemoryStream.createReadStream = function (data, options) { |
|
options = options || {}; |
|
options.readable = true; |
|
options.writable = false; |
|
|
|
return new MemoryStream(data, options); |
|
}; |
|
|
|
|
|
MemoryStream.createWriteStream = function (data, options) { |
|
options = options || {}; |
|
options.readable = false; |
|
options.writable = true; |
|
|
|
return new MemoryStream(data, options); |
|
}; |
|
|
|
|
|
MemoryReadableStream.prototype._read = |
|
MemoryDuplexStream.prototype._read = function _read (n) { |
|
var self = this, |
|
frequence = self.frequence || 0, |
|
wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false; |
|
if ( ! this.queue.length && ! wait_data) { |
|
this.push(null);// finish stream |
|
} else if (this.queue.length) { |
|
setTimeout(function () { |
|
if (self.queue.length) { |
|
var chunk = self.queue.shift(); |
|
if (chunk && ! self._readableState.ended) { |
|
if ( ! self.push(chunk) ) { |
|
self.queue.unshift(chunk); |
|
} |
|
} |
|
} |
|
}, frequence); |
|
} |
|
}; |
|
|
|
|
|
MemoryWritableStream.prototype._write = |
|
MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) { |
|
var decoder = null; |
|
try { |
|
decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null; |
|
} catch (err){ |
|
return cb(err); |
|
} |
|
|
|
var decoded_chunk = decoder ? decoder.write(chunk) : chunk, |
|
queue_size = this._getQueueSize(), |
|
chunk_size = decoded_chunk.length; |
|
|
|
if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) { |
|
if (this.bufoverflow) { |
|
return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")"); |
|
} else { |
|
return cb(); |
|
} |
|
} |
|
|
|
if (this instanceof STREAM.Duplex) { |
|
while (this.queue.length) { |
|
this.push(this.queue.shift()); |
|
} |
|
this.push(decoded_chunk); |
|
} else { |
|
this.queue.push(decoded_chunk); |
|
} |
|
cb(); |
|
}; |
|
|
|
|
|
MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) { |
|
var self = this; |
|
return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () { |
|
self.push(null);//finish readble stream too |
|
if (cb) cb(); |
|
}); |
|
}; |
|
|
|
|
|
MemoryReadableStream.prototype._getQueueSize = |
|
MemoryWritableStream.prototype._getQueueSize = |
|
MemoryDuplexStream.prototype._getQueueSize = function () { |
|
var queuesize = 0, i; |
|
for (i = 0; i < this.queue.length; i++) { |
|
queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length |
|
: this.queue[i].length; |
|
} |
|
return queuesize; |
|
}; |
|
|
|
|
|
MemoryWritableStream.prototype.toString = |
|
MemoryDuplexStream.prototype.toString = |
|
MemoryReadableStream.prototype.toString = |
|
MemoryWritableStream.prototype.getAll = |
|
MemoryDuplexStream.prototype.getAll = |
|
MemoryReadableStream.prototype.getAll = function () { |
|
var self = this, |
|
ret = ''; |
|
this.queue.forEach(function (data) { |
|
ret += data; |
|
}); |
|
return ret; |
|
}; |
|
|
|
|
|
MemoryWritableStream.prototype.toBuffer = |
|
MemoryDuplexStream.prototype.toBuffer = |
|
MemoryReadableStream.prototype.toBuffer = function () { |
|
var buffer = new Buffer(this._getQueueSize()), |
|
currentOffset = 0; |
|
|
|
this.queue.forEach(function (data) { |
|
var data_buffer = data instanceof Buffer ? data : new Buffer(data); |
|
data_buffer.copy(buffer, currentOffset); |
|
currentOffset += data.length; |
|
}); |
|
return buffer; |
|
}; |
|
|
|
|
|
module.exports = MemoryStream;
|
|
|