Blame view

node_modules/websocket-extensions/lib/pipeline/functor.js 1.48 KB
aaac7fed   liuqimichale   add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
  'use strict';
  
  var RingBuffer = require('./ring_buffer');
  
  var Functor = function(session, method) {
    this._session = session;
    this._method  = method;
    this._queue   = new RingBuffer(Functor.QUEUE_SIZE);
    this._stopped = false;
    this.pending  = 0;
  };
  
  Functor.QUEUE_SIZE = 8;
  
  Functor.prototype.call = function(error, message, callback, context) {
    if (this._stopped) return;
  
    var record = {error: error, message: message, callback: callback, context: context, done: false},
        called = false,
        self   = this;
  
    this._queue.push(record);
  
    if (record.error) {
      record.done = true;
      this._stop();
      return this._flushQueue();
    }
  
    var handler = function(err, msg) {
      if (!(called ^ (called = true))) return;
  
      if (err) {
        self._stop();
        record.error   = err;
        record.message = null;
      } else {
        record.message = msg;
      }
  
      record.done = true;
      self._flushQueue();
    };
  
    try {
      this._session[this._method](message, handler);
    } catch (err) {
      handler(err);
    }
  };
  
  Functor.prototype._stop = function() {
    this.pending  = this._queue.length;
    this._stopped = true;
  };
  
  Functor.prototype._flushQueue = function() {
    var queue = this._queue, record;
  
    while (queue.length > 0 && queue.peek().done) {
      record = queue.shift();
      if (record.error) {
        this.pending = 0;
        queue.clear();
      } else {
        this.pending -= 1;
      }
      record.callback.call(record.context, record.error, record.message);
    }
  };
  
  module.exports = Functor;