123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- var stream = require("stream"),
- constants = require("./constants"),
- util = require("util");
-
- var ReadableStreamBuffer = module.exports = function(opts) {
- var that = this;
-
- stream.Stream.call(this);
-
- opts = opts || {};
- var frequency = opts.hasOwnProperty("frequency") ? opts.frequency : constants.DEFAULT_FREQUENCY;
- var chunkSize = opts.chunkSize || constants.DEFAULT_CHUNK_SIZE;
- var initialSize = opts.initialSize || constants.DEFAULT_INITIAL_SIZE;
- var incrementAmount = opts.incrementAmount || constants.DEFAULT_INCREMENT_AMOUNT;
-
- var size = 0;
- var buffer = new Buffer(initialSize);
- var encoding = null;
-
- this.readable = true;
- this.writable = false;
-
- var sendData = function() {
- var amount = Math.min(chunkSize, size);
-
- if (amount > 0) {
- var chunk = null;
- if(encoding) {
- chunk = buffer.toString(encoding, 0, amount);
- }
- else {
- chunk = new Buffer(amount);
- buffer.copy(chunk, 0, 0, amount);
- }
-
- that.emit("data", chunk);
-
- if(amount < buffer.length)
- buffer.copy(buffer, 0, amount, size);
- size -= amount;
- }
-
- if(size === 0 && !that.readable) {
- that.emit("end");
- that.emit("close");
- if (sendData && sendData.interval) {
- clearInterval(sendData.interval);
- sendData.interval = null;
- }
- }
- };
-
- this.size = function() {
- return size;
- };
-
- this.maxSize = function() {
- return buffer.length;
- };
-
- var increaseBufferIfNecessary = function(incomingDataSize) {
- if((buffer.length - size) < incomingDataSize) {
- var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount);
-
- var newBuffer = new Buffer(buffer.length + (incrementAmount * factor));
- buffer.copy(newBuffer, 0, 0, size);
- buffer = newBuffer;
- }
- };
-
- this.put = function(data, encoding) {
- if(!that.readable) return;
-
- var wasEmpty = size === 0;
- if(Buffer.isBuffer(data)) {
- increaseBufferIfNecessary(data.length);
- data.copy(buffer, size, 0);
- size += data.length;
- }
- else {
- data = data + "";
- var dataSizeInBytes = Buffer.byteLength(data);
- increaseBufferIfNecessary(dataSizeInBytes);
- buffer.write(data, size, encoding || "utf8");
- size += dataSizeInBytes;
- }
-
- if (wasEmpty && size > 0) {
- this.emit('readable')
- }
-
- if (!this.isPaused && !frequency) {
- while (size > 0) {
- sendData();
- }
- }
- };
-
- this.pause = function() {
- this.isPaused = true;
- if(sendData && sendData.interval) {
- clearInterval(sendData.interval);
- delete sendData.interval;
- }
- };
-
- this.resume = function() {
- this.isPaused = false;
- if(sendData && !sendData.interval && frequency > 0) {
- sendData.interval = setInterval(sendData, frequency);
- }
- };
-
- this.destroy = function() {
- that.emit("end");
- if(sendData.interval) clearInterval(sendData.interval);
- sendData = null;
- that.readable = false;
- that.emit("close");
- };
-
- this.destroySoon = function() {
- that.readable = false;
- if (!sendData.interval) {
- that.emit("end");
- that.emit("close");
- }
- };
-
- this.setEncoding = function(_encoding) {
- encoding = _encoding;
- };
-
- this.resume();
- };
- util.inherits(ReadableStreamBuffer, stream.Stream);
|