Nodejs-异步并发控制

发布于:2024-05-05 ⋅ 阅读:(27) ⋅ 点赞:(0)

异步并发控制

在 node 中可以利用异步发起并行调用。但是如果并发量过大,就会导致下层服务器吃不消。

bagpipe 解决方案

解决方案

  • 通过一个队列来控制并发量
  • 如果当前活跃的异步调用小于限定值,从队列中取出执行
  • 如果活跃调用达到限定值,调用暂时放到队列中
  • 每个异步调用结束的时候,从队列中取出新的异步调用执行
var Bagpipe = require("bagpipe");
var bagpipe = new Bagpipe(10);
for (var i = 0; i < 100; i++) {
  bagpipe.push(async, function () {
    // 异步回调执行
  });
}
bagpipe.on("full", function (lenght) {
  console.warn("底层系统不能及时完成,队列拥堵");
});

核心实现

Bagpipe.prototype.push = function (method) {
  var args = [].slice.call(arguments, 1);
  var callback = args[args.length - 1];
  if (typeof callback !== "function") {
    args.push(function () {});
  }
  if (this.options.disabled || this.limit < 1) {
    method.apply(null, args);
    return this;
  }
  if (this.queue.length < this.queueLength || !this.options.refuse) {
    this.queue.push({
      method: method,
      args: args,
    });
  } else {
    var err = new Error("too much async call in queue");
    err.name = "TooMuchAsyncCallError";
    callback(err);
  }
  if (this.queue.length > 1) {
    this.emit("full", this.queue.lenght);
  }
  this.next();
  return this;
};

next()方法主要是用来判断活跃调用的数量,如果正常,使用内部方法 run 来执行真正的调用。

Bagpipe.prototype.next = function () {
  var that = this;
  if (that.active < that.limit && this.queue.length) {
    var req = this.queue.shift();
    this.run(req.method, req.args);
  }
};
Bagpine.prototype.run = function (method, args) {
  var that = this;
  that.active++;
  var callback = args[args.length - 1];
  var timer = null;
  var called = false;
  args[args.length - 1] = function (err) {
    if (timer) {
      clearTimeout(timer);
      timer = null;
    }
    if (!called) {
      this._next();
      callback.apply(null, arguments);
    } else {
      if (err) {
        that.emit("outdated", err);
      }
    }
  };
  var timeout = that.options.timeout;
  if (timeout) {
    timer = setTimeout(function () {
      called = true;
      that._next();
      // pass the exception
      var err = new Error(timeout + "ms timeout");
      err.name = "BagpipeTimeoutError";
      err.data = {
        name: method.name,
        method: method.toString(),
        args: args.slice(0, -1),
      };
      callback(err);
    }, timeout);
  }
  method.apply(null, args);
};
  • 拒绝模式
    对于大量的异步调用,会分场景进行区分。设计到并发控制,会造成部分等待,如果调用由实时方面的需求,需要快速返回。这种情境下需要快速失败,让调用方竟早返回。
  • 超时控制
    超时控制是为异步调用设置一个时间阈值,如果异步调用没有在规定的时间内完成,先执行用户传入的回调函数。

async解决方案

paralleLimit()和parallel类似,多了一个限制并发数量的参数,使得任务只能同时并发一定数量,不是无限制并发。

async.parallelLimit([
    function(callback) {
        fs.readFile('file1.txt','utf-8',callback)
    },
    function(callback) {
        fs.readFile('file2.txt','utf-8',callback)
    }
], 1, function(err, results) {
    // todo
})

paralleLimit()方法不能动态的增加并行任务。async提供了queue()方法来满足需求。

var q = async.queue(function(file,callback) {
    fs.readFile(file, 'utf-8', callback);
},2)
q.drain = function () {};
fs.readdirSync('.').forEach(function (file) {
    q.push(file, function (err, data) {
    })
})

queue实现了动态添加并行任务,但是想不paralleLimit(),queue是固定的。丢失了paralleLimit()的多样性。


网站公告

今日签到

点亮在社区的每一天
去签到