Без опису

NodejsStreamInputAdapter.js 1.8KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. "use strict";
  2. var utils = require('../utils');
  3. var GenericWorker = require('../stream/GenericWorker');
  4. /**
  5. * A worker that use a nodejs stream as source.
  6. * @constructor
  7. * @param {String} filename the name of the file entry for this stream.
  8. * @param {Readable} stream the nodejs stream.
  9. */
  10. function NodejsStreamInputAdapter(filename, stream) {
  11. GenericWorker.call(this, "Nodejs stream input adapter for " + filename);
  12. this._upstreamEnded = false;
  13. this._bindStream(stream);
  14. }
  15. utils.inherits(NodejsStreamInputAdapter, GenericWorker);
  16. /**
  17. * Prepare the stream and bind the callbacks on it.
  18. * Do this ASAP on node 0.10 ! A lazy binding doesn't always work.
  19. * @param {Stream} stream the nodejs stream to use.
  20. */
  21. NodejsStreamInputAdapter.prototype._bindStream = function (stream) {
  22. var self = this;
  23. this._stream = stream;
  24. stream.pause();
  25. stream
  26. .on("data", function (chunk) {
  27. self.push({
  28. data: chunk,
  29. meta : {
  30. percent : 0
  31. }
  32. });
  33. })
  34. .on("error", function (e) {
  35. if(self.isPaused) {
  36. this.generatedError = e;
  37. } else {
  38. self.error(e);
  39. }
  40. })
  41. .on("end", function () {
  42. if(self.isPaused) {
  43. self._upstreamEnded = true;
  44. } else {
  45. self.end();
  46. }
  47. });
  48. };
  49. NodejsStreamInputAdapter.prototype.pause = function () {
  50. if(!GenericWorker.prototype.pause.call(this)) {
  51. return false;
  52. }
  53. this._stream.pause();
  54. return true;
  55. };
  56. NodejsStreamInputAdapter.prototype.resume = function () {
  57. if(!GenericWorker.prototype.resume.call(this)) {
  58. return false;
  59. }
  60. if(this._upstreamEnded) {
  61. this.end();
  62. } else {
  63. this._stream.resume();
  64. }
  65. return true;
  66. };
  67. module.exports = NodejsStreamInputAdapter;