首页 > 其他分享 >如果有100个请求,如何控制并发?

如果有100个请求,如何控制并发?

时间:2024-03-30 21:01:00浏览次数:22  
标签:resolve const 请求 接口 并发 Promise 100

题目

现有100个请求需要发送,请设计一个算法,使用Promise来控制并发(并发数量最大为10),来完成100个请求;
首先先模拟下 100 个请求:

// 请求列表
const requestList = [];

// 为了方便查看,i从1开始计数
for (let i = 1; i <= 100; i++) {
  requestList.push(
    () =>
      new Promise(resolve => {
        setTimeout(() => {
          console.log('done', i);
          resolve(i);
        }, Math.random() * 1000);
      }),
  );
}

Promise.all()

初次 看到这个问题,相信大部分同学第一个想到的肯定是 Promise.all,因为它是最常见的并发请求方式,下面来实现一下:

const parallelRun = async max => {
  const requestSliceList = [];
  for (let i = 0; i < requestList.length; i += max) {
    requestSliceList.push(requestList.slice(i, i + max));
  }

  for (let i = 0; i < requestSliceList.length; i++) {
    const group = requestSliceList[i];
    try {
      const res = await Promise.all(group.map(fn => fn()));
      console.log('接口返回值为:', res);
    } catch (error) {
      console.error(error);
    }
  }
};

看下效果:
1.gif
效果不错!!
每次都是并发 10 个请求,当这 10 个请求都完成返回时,继续下一个 10 个请求,完美实现需求;
可是此时面试官问:如果这里边有一个请求失败了会怎样?
我:额…,不确定
面试官:回去等通知吧!
顺便说一下,大家好,我是【前端探险家克鲁】。微信公众号、知乎、掘金、CSDN同名,欢迎查看我的个人简介,一起学习提升。

虽然回家等通知了,但这道面试题还是得弄明白,修改下模拟请求,使其随机产生一个错误,修改如下:

// 请求列表
const requestList = [];

for (let i = 1; i <= 100; i++) {
  requestList.push(
    () =>
      new Promise((resolve, reject) => {
        setTimeout(() => {
          if (i === 92) {
            reject(new Error('出错了,出错请求:' + i));
          } else {
            console.log('done', i);
            resolve(i);
          }
        }, Math.random() * 1000);
      }),
  );
}

控制台看下运行结果:
2.gif
有一个请求失败了,这个 Promise.all就失败了,没有返回值
一组中一个请求失败就无法获取改组其他成员的返回值,这对于不需要判断返回值的情况倒是可以,但是实际业务中,返回值是一个很重要的数据
我们可以接受某个接口失败了没有返回值,但是无法接受一个请求失败了,跟它同组的其他 9 个请求也没有返回值
既然,失败的请求会打断 Promise.all,那有没有一种方法可以不被失败打断呢?
还真有,它就是 Promise.allSettled!

Promise.allSettled()

先来看下权威的 MDN 的介绍

Promise.allSettled() 方法是 promise 并发方法之一。在你有多个不依赖于彼此成功完成的异步任务时,或者你总是想知道每个 promise 的结果时,使用 Promise.allSettled()

简单说就是:每个请求都会返回结果,不管失败还是成功
使用 Promise.allSettled()替换下 Promise.all()

const parallelRun = async max => {
  const requestSliceList = [];
  for (let i = 0; i < requestList.length; i += max) {
    requestSliceList.push(requestList.slice(i, i + max));
  }

  for (let i = 0; i < requestSliceList.length; i++) {
    const group = requestSliceList[i];
    try {
      // 使用 allSettled 替换 all
      const res = await Promise.allSettled(group.map(fn => fn()));
      console.log('接口返回值为:', res);
    } catch (error) {
      console.error(error);
    }
  }
};

看下返回结果:
image.png
可以看到,接口全部正常有返回值,返回值中会正常记录当前请求时成功还是失败
不错哦,感觉 Promise.allSettled()就是最优解了!

此时面试官又问:那如果有一个请求非常耗时,会出现什么情况?
答:有一个请求非常耗时,那组的请求返回就会很慢,会阻塞了后续的接口并发。
面试官:有没有什么方法可以解决这个问题?
我 :额… 不知道…
面试官:回去等通知吧~~~

最优解

分析问题

使用 Promise.all()或是 Promise.allSettled(),每次并发 10 个请求,确实可以满足并发要求,但是效率较低:如果存在一个或多个慢接口,那么会出现以下两个问题:

  • 有慢接口的并发组返回会很慢,一个慢接口拖慢了其他 9 个接口,得不偿失
  • 本来我们是可以并发 10 个请求的,但是一个慢接口导致该组的其他 9 个并发位置都被浪费了,这会导致这 100 个接口的并发时间被无情拉长
  • 慢接口组后续的并发组都被阻塞了,更慢了

解决方法

有没有办法解决上述问题呢,答案是肯定的:
可以维护一个运行池和一个等待队列,运行池始终保持 10 个请求并发,
当运行池中有一个请求完成时,就从等待队列中拿出一个新请求放到运行池中运行,这样就可以保持运行池始终是满负荷运行,
即使有一个慢接口,也不会阻塞后续的接口入池

代码实现

// 运行池
const pool = new Set();

// 等待队列
const waitQueue = [];

/**
 * @description: 限制并发数量的请求
 * @param {*} reqFn:请求方法
 * @param {*} max:最大并发数
 */
const request = (reqFn, max) => {
  return new Promise((resolve, reject) => {
    // 判断运行吃是否已满
    const isFull = pool.size >= max;

    // 包装的新请求
    const newReqFn = () => {
      reqFn()
        .then(res => {
          resolve(res);
        })
        .catch(err => {
          reject(err);
        })
        .finally(() => {
          // 请求完成后,将该请求从运行池中删除
          pool.delete(newReqFn);
          // 从等待队列中取出一个新请求放入等待运行池执行
          const next = waitQueue.shift();
          if (next) {
            pool.add(next);
            next();
          }
        });
    };

    if (isFull) {
      // 如果运行池已满,则将新的请求放到等待队列中
      waitQueue.push(newReqFn);
    } else {
      // 如果运行池未满,则向运行池中添加一个新请求并执行该请求
      pool.add(newReqFn);
      newReqFn();
    }
  });
};

requestList.forEach(async item => {
  const res = await request(item, 10);
  console.log(res);
});

效果

3.gif
可以看到,100 个接口不断执行,并没有任何等待或是被阻塞的现象,完美!

其他优秀库

社区已有很多优秀的并发限制库,这里重点介绍下 p-limit
安装:

npm install p-limit -S

使用方法:

import plimit from 'p-limit';


const limit = plimit(10);

requestList.forEach(async item => {
  const res = await limit(item);
  console.log(res);
});

运行效果与上面的队列的运行效果是一致的。下面看下库源码(精简后):

import Queue from 'yocto-queue';

export default function pLimit(concurrency) {
  const queue = new Queue();
  let activeCount = 0;

  const next = () => {
    activeCount--;

    if (queue.size > 0) {
      queue.dequeue()();
    }
  };

  const run = async (function_, resolve, arguments_) => {
    activeCount++;

    const result = (async () => function_(...arguments_))();

    resolve(result);

    try {
      await result;
    } catch {}

    next();
  };

  const enqueue = (function_, resolve, arguments_) => {
    queue.enqueue(run.bind(undefined, function_, resolve, arguments_));

    (async () => {
      // This function needs to wait until the next microtask before comparing
      // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
      // when the run function is dequeued and called. The comparison in the if-statement
      // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
      await Promise.resolve();

      if (activeCount < concurrency && queue.size > 0) {
        queue.dequeue()();
      }
    })();
  };

  const generator = (function_, ...arguments_) =>
    new Promise(resolve => {
      enqueue(function_, resolve, arguments_);
    });

  return generator;
}

短短 60 行代码就实现了一个功能强大的并发处理库,真是厉害,下面分析下具体实现:

  • 首先 p-limit 库默认导出一个函数pLimit,该函数接收一个数字,表示最大并发数
  • pLimit函数函数返回一个 generator函数,该函数返回一个 Promise,并且其中调用了 enqueue函数
  • enqueue 函数主要是将 run函数加入队列 queue中,之后判断下 activeCount < concurrency && queue.size > 0,表示当前队列大小小于最大并发数且队列不为空,则需要从队列中取出一个请求执行,即执行run函数
  • run函数执行时需要先将 activeCount加一,之后执行真正的请求函数 (async () => function_(...arguments_))()
  • 之后等待请求完成 await result; 之后执行 next函数
  • next函数主要从队列中取出一个新请求执行并将activeCount 减一

总结

本文主要总结了 100 个请求限制并发的方法:

  • Promise.all() 最简单的控制并发,但是请求出错会导致该组无返回值
  • Promise.allSettled() 解决了Promise.all()的问题,但是却存在慢接口阻塞后续请求,且浪费其余并发位置的问题
  • 通过维护一个运行池,当运行池中有请求完成时便从等待队列中取一个心情求入池执行,直到所有的请求都入池
  • 介绍了社区的 p-limit库的使用方法和实现原理

标签:resolve,const,请求,接口,并发,Promise,100
From: https://blog.csdn.net/y_ang_1/article/details/137151838

相关文章

  • 说说 HTTP 常见的请求头有哪些? 作用?
    一、是什么HTTP头字段(HTTPheaderfields),是指在超文本传输协议(HTTP)的请求和响应消息中的消息头部分它们定义了一个超文本传输协议事务中的操作参数HTTP头部字段可以自己根据需要定义,因此可能在 Web服务器和浏览器上发现非标准的头字段下面是一个HTTP请求的请求头:GET/hom......
  • 【洛谷】P1004 [NOIP2000提高组]方格取数
    题目描述题目描述设有N×N 的方格图(N≤9),我们将其中的某些方格中填入正整数,而其他的方格中则放入数字 0。如下图所示(见样例):某人从图的左上角的 A 点出发,可以向下行走,也可以向右走,直到到达右下角的 B 点。在走过的路上,他可以取走方格中的数(取走后的方格中将变为......
  • 处理并发冲突
    处理并发冲突项目2023/10/0512个参与者反馈本文内容开放式并发本机数据库生成的并发令牌应用程序管理的并发令牌解决并发冲突显示另外2个提示可在GitHub上查看此文章的示例。在大多数情况下,数据库会由多个应用程序实例并发使用,每个实例对数据执行独立修改。在同一时间修......
  • NET Core使用Grpc通信(一):一元请求
    gRPC是一个现代的开源高性能远程过程调用(RPC)框架,它可以高效地连接数据中心内和跨数据中心的服务,支持负载平衡、跟踪、运行状况检查和身份验证。gRPC通过使用ProtocolBuffers作为数据传输格式,实现了在不同平台上的通信,并支持双向流和流式传输。RPC是远程过程调用的缩写,实现......
  • Java(2) ----- 异常、多线程、同步安全、死锁、并发包、Lambda表达式、Stream流
    异常方法默认都可以自动抛出运行时异常!自定义异常:(1)自定义编译时异常1、定义一个异常类继承Exception2、重写构造器3、在出现异常的地方用thrownew自定义对象抛出4、编译时异常是编译阶段就报错,提醒跟家强烈,一定需要处理!(2)自定义运行时异常1、定义一个异常类继承RunTimeE......
  • 【力扣hot100】160.相交链表
    相交链表给你两个单链表的头节点headA和headB,请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点,返回null。图示两个链表在节点c1开始相交:题目数据保证整个链式结构中不存在环。注意,函数返回结果后,链表必须保持其原始结构。示例1:输......
  • 【并发编程】线程的基础概念
    一、基础概念1.1进程与线程A什么是进程?进程是指运行中的程序。比如我们使用钉钉,浏览器,需要启动这个程序,操作系统会给这个程序分配一定的资源(占用内存资源)。什么线程?线程是CPU调度的基本单位,每个线程执行的都是某一个进程的代码的某个片段。举个栗子:房子与人比如现......
  • Vue3+Vite+Axios Request 请求封装(TS版本)最新
    Vue3+Vite+AxiosRequest请求封装(TS版本)http>index.ts请求封装/**@Date:2024-03-3012:37:05*@LastEditors:zhong*@LastEditTime:2024-03-3014:12:52*@FilePath:\app-admin\src\http\index.ts*/importaxios,{AxiosInstance,AxiosRequestCon......
  • yii2请求组件
    yii2请求组件应用的请求是用yii\web\Request对象来表示的请求参数$request=Yii::$app->request;$get=$request->get();//等价于:$get=$_GET;$id=$request->get('id');//等价于:$id=isset($_GET['id'])?$_GET['id']:null;$i......
  • 请求转发404
    请求的资源[/BookShopping/BookShopping/PayServlet]不可用RequestDispatcherdispatcher=request.getRequestDispatcher("/BookShopping/PayServlet");dispatcher.forward(request,response);这里是请求转发,但我写的路径是    /BookShopping/PayServletgetR......