Blame view

node_modules/fstream/lib/collect.js 1.75 KB
aaac7fed   liuqimichale   add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  module.exports = collect
  
  function collect (stream) {
    if (stream._collected) return
  
    if (stream._paused) return stream.on('resume', collect.bind(null, stream))
  
    stream._collected = true
    stream.pause()
  
    stream.on('data', save)
    stream.on('end', save)
    var buf = []
    function save (b) {
      if (typeof b === 'string') b = new Buffer(b)
      if (Buffer.isBuffer(b) && !b.length) return
      buf.push(b)
    }
  
    stream.on('entry', saveEntry)
    var entryBuffer = []
    function saveEntry (e) {
      collect(e)
      entryBuffer.push(e)
    }
  
    stream.on('proxy', proxyPause)
    function proxyPause (p) {
      p.pause()
    }
  
    // replace the pipe method with a new version that will
    // unlock the buffered stuff.  if you just call .pipe()
    // without a destination, then it'll re-play the events.
    stream.pipe = (function (orig) {
      return function (dest) {
        // console.error(' === open the pipes', dest && dest.path)
  
        // let the entries flow through one at a time.
        // Once they're all done, then we can resume completely.
        var e = 0
        ;(function unblockEntry () {
          var entry = entryBuffer[e++]
          // console.error(" ==== unblock entry", entry && entry.path)
          if (!entry) return resume()
          entry.on('end', unblockEntry)
          if (dest) dest.add(entry)
          else stream.emit('entry', entry)
        })()
  
        function resume () {
          stream.removeListener('entry', saveEntry)
          stream.removeListener('data', save)
          stream.removeListener('end', save)
  
          stream.pipe = orig
          if (dest) stream.pipe(dest)
  
          buf.forEach(function (b) {
            if (b) stream.emit('data', b)
            else stream.emit('end')
          })
  
          stream.resume()
        }
  
        return dest
      }
    })(stream.pipe)
  }