Initial commit
[yaffs-website] / node_modules / block-stream / block-stream.js
1 // write data to it, and it'll emit data in 512 byte blocks.
2 // if you .end() or .flush(), it'll emit whatever it's got,
3 // padded with nulls to 512 bytes.
4
5 module.exports = BlockStream
6
7 var Stream = require("stream").Stream
8   , inherits = require("inherits")
9   , assert = require("assert").ok
10   , debug = process.env.DEBUG ? console.error : function () {}
11
12 function BlockStream (size, opt) {
13   this.writable = this.readable = true
14   this._opt = opt || {}
15   this._chunkSize = size || 512
16   this._offset = 0
17   this._buffer = []
18   this._bufferLength = 0
19   if (this._opt.nopad) this._zeroes = false
20   else {
21     this._zeroes = new Buffer(this._chunkSize)
22     for (var i = 0; i < this._chunkSize; i ++) {
23       this._zeroes[i] = 0
24     }
25   }
26 }
27
28 inherits(BlockStream, Stream)
29
30 BlockStream.prototype.write = function (c) {
31   // debug("   BS write", c)
32   if (this._ended) throw new Error("BlockStream: write after end")
33   if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
34   if (c.length) {
35     this._buffer.push(c)
36     this._bufferLength += c.length
37   }
38   // debug("pushed onto buffer", this._bufferLength)
39   if (this._bufferLength >= this._chunkSize) {
40     if (this._paused) {
41       // debug("   BS paused, return false, need drain")
42       this._needDrain = true
43       return false
44     }
45     this._emitChunk()
46   }
47   return true
48 }
49
50 BlockStream.prototype.pause = function () {
51   // debug("   BS pausing")
52   this._paused = true
53 }
54
55 BlockStream.prototype.resume = function () {
56   // debug("   BS resume")
57   this._paused = false
58   return this._emitChunk()
59 }
60
61 BlockStream.prototype.end = function (chunk) {
62   // debug("end", chunk)
63   if (typeof chunk === "function") cb = chunk, chunk = null
64   if (chunk) this.write(chunk)
65   this._ended = true
66   this.flush()
67 }
68
69 BlockStream.prototype.flush = function () {
70   this._emitChunk(true)
71 }
72
73 BlockStream.prototype._emitChunk = function (flush) {
74   // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
75
76   // emit a <chunkSize> chunk
77   if (flush && this._zeroes) {
78     // debug("    BS push zeroes", this._bufferLength)
79     // push a chunk of zeroes
80     var padBytes = (this._bufferLength % this._chunkSize)
81     if (padBytes !== 0) padBytes = this._chunkSize - padBytes
82     if (padBytes > 0) {
83       // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
84       this._buffer.push(this._zeroes.slice(0, padBytes))
85       this._bufferLength += padBytes
86       // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
87     }
88   }
89
90   if (this._emitting || this._paused) return
91   this._emitting = true
92
93   // debug("    BS entering loops")
94   var bufferIndex = 0
95   while (this._bufferLength >= this._chunkSize &&
96          (flush || !this._paused)) {
97     // debug("     BS data emission loop", this._bufferLength)
98
99     var out
100       , outOffset = 0
101       , outHas = this._chunkSize
102
103     while (outHas > 0 && (flush || !this._paused) ) {
104       // debug("    BS data inner emit loop", this._bufferLength)
105       var cur = this._buffer[bufferIndex]
106         , curHas = cur.length - this._offset
107       // debug("cur=", cur)
108       // debug("curHas=%j", curHas)
109       // If it's not big enough to fill the whole thing, then we'll need
110       // to copy multiple buffers into one.  However, if it is big enough,
111       // then just slice out the part we want, to save unnecessary copying.
112       // Also, need to copy if we've already done some copying, since buffers
113       // can't be joined like cons strings.
114       if (out || curHas < outHas) {
115         out = out || new Buffer(this._chunkSize)
116         cur.copy(out, outOffset,
117                  this._offset, this._offset + Math.min(curHas, outHas))
118       } else if (cur.length === outHas && this._offset === 0) {
119         // shortcut -- cur is exactly long enough, and no offset.
120         out = cur
121       } else {
122         // slice out the piece of cur that we need.
123         out = cur.slice(this._offset, this._offset + outHas)
124       }
125
126       if (curHas > outHas) {
127         // means that the current buffer couldn't be completely output
128         // update this._offset to reflect how much WAS written
129         this._offset += outHas
130         outHas = 0
131       } else {
132         // output the entire current chunk.
133         // toss it away
134         outHas -= curHas
135         outOffset += curHas
136         bufferIndex ++
137         this._offset = 0
138       }
139     }
140
141     this._bufferLength -= this._chunkSize
142     assert(out.length === this._chunkSize)
143     // debug("emitting data", out)
144     // debug("   BS emitting, paused=%j", this._paused, this._bufferLength)
145     this.emit("data", out)
146     out = null
147   }
148   // debug("    BS out of loops", this._bufferLength)
149
150   // whatever is left, it's not enough to fill up a block, or we're paused
151   this._buffer = this._buffer.slice(bufferIndex)
152   if (this._paused) {
153     // debug("    BS paused, leaving", this._bufferLength)
154     this._needsDrain = true
155     this._emitting = false
156     return
157   }
158
159   // if flushing, and not using null-padding, then need to emit the last
160   // chunk(s) sitting in the queue.  We know that it's not enough to
161   // fill up a whole block, because otherwise it would have been emitted
162   // above, but there may be some offset.
163   var l = this._buffer.length
164   if (flush && !this._zeroes && l) {
165     if (l === 1) {
166       if (this._offset) {
167         this.emit("data", this._buffer[0].slice(this._offset))
168       } else {
169         this.emit("data", this._buffer[0])
170       }
171     } else {
172       var outHas = this._bufferLength
173         , out = new Buffer(outHas)
174         , outOffset = 0
175       for (var i = 0; i < l; i ++) {
176         var cur = this._buffer[i]
177           , curHas = cur.length - this._offset
178         cur.copy(out, outOffset, this._offset)
179         this._offset = 0
180         outOffset += curHas
181         this._bufferLength -= curHas
182       }
183       this.emit("data", out)
184     }
185     // truncate
186     this._buffer.length = 0
187     this._bufferLength = 0
188     this._offset = 0
189   }
190
191   // now either drained or ended
192   // debug("either draining, or ended", this._bufferLength, this._ended)
193   // means that we've flushed out all that we can so far.
194   if (this._needDrain) {
195     // debug("emitting drain", this._bufferLength)
196     this._needDrain = false
197     this.emit("drain")
198   }
199
200   if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
201     // debug("emitting end", this._bufferLength)
202     this._endEmitted = true
203     this.emit("end")
204   }
205
206   this._emitting = false
207
208   // debug("    BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
209 }