Açıklama Yok

readable_streambuffer.js 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. var stream = require("stream"),
  2. constants = require("./constants"),
  3. util = require("util");
  4. var ReadableStreamBuffer = module.exports = function(opts) {
  5. var that = this;
  6. stream.Stream.call(this);
  7. opts = opts || {};
  8. var frequency = opts.hasOwnProperty("frequency") ? opts.frequency : constants.DEFAULT_FREQUENCY;
  9. var chunkSize = opts.chunkSize || constants.DEFAULT_CHUNK_SIZE;
  10. var initialSize = opts.initialSize || constants.DEFAULT_INITIAL_SIZE;
  11. var incrementAmount = opts.incrementAmount || constants.DEFAULT_INCREMENT_AMOUNT;
  12. var size = 0;
  13. var buffer = new Buffer(initialSize);
  14. var encoding = null;
  15. this.readable = true;
  16. this.writable = false;
  17. var sendData = function() {
  18. var amount = Math.min(chunkSize, size);
  19. if (amount > 0) {
  20. var chunk = null;
  21. if(encoding) {
  22. chunk = buffer.toString(encoding, 0, amount);
  23. }
  24. else {
  25. chunk = new Buffer(amount);
  26. buffer.copy(chunk, 0, 0, amount);
  27. }
  28. that.emit("data", chunk);
  29. if(amount < buffer.length)
  30. buffer.copy(buffer, 0, amount, size);
  31. size -= amount;
  32. }
  33. if(size === 0 && !that.readable) {
  34. that.emit("end");
  35. that.emit("close");
  36. if (sendData && sendData.interval) {
  37. clearInterval(sendData.interval);
  38. sendData.interval = null;
  39. }
  40. }
  41. };
  42. this.size = function() {
  43. return size;
  44. };
  45. this.maxSize = function() {
  46. return buffer.length;
  47. };
  48. var increaseBufferIfNecessary = function(incomingDataSize) {
  49. if((buffer.length - size) < incomingDataSize) {
  50. var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount);
  51. var newBuffer = new Buffer(buffer.length + (incrementAmount * factor));
  52. buffer.copy(newBuffer, 0, 0, size);
  53. buffer = newBuffer;
  54. }
  55. };
  56. this.put = function(data, encoding) {
  57. if(!that.readable) return;
  58. var wasEmpty = size === 0;
  59. if(Buffer.isBuffer(data)) {
  60. increaseBufferIfNecessary(data.length);
  61. data.copy(buffer, size, 0);
  62. size += data.length;
  63. }
  64. else {
  65. data = data + "";
  66. var dataSizeInBytes = Buffer.byteLength(data);
  67. increaseBufferIfNecessary(dataSizeInBytes);
  68. buffer.write(data, size, encoding || "utf8");
  69. size += dataSizeInBytes;
  70. }
  71. if (wasEmpty && size > 0) {
  72. this.emit('readable')
  73. }
  74. if (!this.isPaused && !frequency) {
  75. while (size > 0) {
  76. sendData();
  77. }
  78. }
  79. };
  80. this.pause = function() {
  81. this.isPaused = true;
  82. if(sendData && sendData.interval) {
  83. clearInterval(sendData.interval);
  84. delete sendData.interval;
  85. }
  86. };
  87. this.resume = function() {
  88. this.isPaused = false;
  89. if(sendData && !sendData.interval && frequency > 0) {
  90. sendData.interval = setInterval(sendData, frequency);
  91. }
  92. };
  93. this.destroy = function() {
  94. that.emit("end");
  95. if(sendData.interval) clearInterval(sendData.interval);
  96. sendData = null;
  97. that.readable = false;
  98. that.emit("close");
  99. };
  100. this.destroySoon = function() {
  101. that.readable = false;
  102. if (!sendData.interval) {
  103. that.emit("end");
  104. that.emit("close");
  105. }
  106. };
  107. this.setEncoding = function(_encoding) {
  108. encoding = _encoding;
  109. };
  110. this.resume();
  111. };
  112. util.inherits(ReadableStreamBuffer, stream.Stream);