首页 > 其他分享 >聊聊项目中如何实现请求聚合

聊聊项目中如何实现请求聚合

时间:2024-08-06 09:30:22浏览次数:8  
标签:username code 聚合 请求 success Result 聊聊 com User

前言

什么是请求聚合

见名之意就是将多次的请求整合为一个请求处理

如何实现请求聚合

有个快手大佬开源了一个工具类:buffer-trigger,这玩意就可以用来做请求聚合。

buffer-trigger适用场景

  1. 高吞吐量消息处理: 当系统需要处理大量快速产生的数据或消息时,如日志记录、事件追踪、实时交易数据等,单条消息的即时处理可能会导致过多的系统开销(如网络通信、数据库操作等)。通过使用BufferTrigger,可以将这些消息暂时缓存在阻塞队列中,累积到一定数量后一次性进行批量处理。这样既能减少系统调用次数,提升整体处理效率,又能降低对下游系统的瞬时压力。

  2. 延迟敏感但允许适度延后处理: 在某些业务场景中,数据或消息的处理虽有一定的时效性要求,但并不严格到需要立即响应。例如,用户行为分析、运营统计报表生成等任务,可以在容忍的时间窗口内完成。BufferTrigger通过设置批处理阈值和延迟等待时间,允许在满足一定积累量或等待时间后才触发消费,从而实现数据的“准实时”处理,兼顾了处理效率与延迟需求。

  3. 资源优化与成本控制: 对于依赖付费服务(如云存储、API调用)或者计算资源有限的情况,批量处理能够显著减少对外部服务的调用量或内部计算资源的占用。例如,定期向云存储批量上传日志文件、批量发送电子邮件通知、批量查询外部API并聚合结果等。BufferTrigger通过合并多个小任务为一个大任务,有助于降低单位数据处理的成本。

  4. 避免频繁IO操作: 若消息的消费涉及大量的磁盘IO、网络IO或其他昂贵的系统资源操作,如数据库写入、文件写入、跨网络的数据同步等,频繁的单个操作可能导致性能瓶颈。BufferTrigger通过批量处理,能够减少这类操作的次数,从而提高系统整体性能。

  5. 微服务间解耦与流量控制: 在分布式微服务架构中,不同服务之间可能存在强依赖关系。使用BufferTrigger可以在服务间引入一层缓冲,避免下游服务瞬时过载或临时不可用导致整个系统崩溃。同时,批量处理能平滑消费端的请求流量,减轻对上游服务的压力,增强系统的稳定性和容错能力。

如何使用buffer-trigger

1、在项目的pom中引入buffer-trigger GAV

<dependency>
  <groupId>com.github.phantomthief</groupId>
  <artifactId>buffer-trigger</artifactId>
  <version>0.2.9</version>
</dependency>

2、使用案例一:使用SimpleBufferTrigger

/**
 * {@link BufferTrigger}的通用实现,适合大多数业务场景
 * <p>
 * 消费触发策略会考虑消费回调函数的执行时间,实际执行间隔 = 理论执行间隔 - 消费回调函数执行时间;
 * 如回调函数执行时间已超过理论执行间隔,将立即执行下一次消费任务.
 *
 * @author w.vela
 */

示例:

public class BufferTriggerDemo {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long, Map<Long, AtomicInteger>> simple()
            .maxBufferCount(10)
            .interval(4, TimeUnit.SECONDS)
            .setContainer(ConcurrentHashMap::new, (map, uid) -> {
                map.computeIfAbsent(uid, key -> new AtomicInteger()).addAndGet(1);
                return true;
            })
            .consumer(this::consumer)
            .build();



    public void consumer(Map<Long, AtomicInteger> map) {
        System.out.println(map);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        // 最大容量是10,这里尝试添加11个元素0-10
        for (int i = 0; i < 5; i ++) {
            for (long j = 0; j < 11; j ++) {
                bufferTrigger.enqueue(j);
            }
        }

        Thread.sleep(7000);
    }

参数描述

  • maxBuffeCount(long count):
    指定容器最大容量,比如这里指定了10,当在下次聚合前容器元素数量达到10就无法添加了,-1表示无限制;
  • internal(longinterval, TimeUnit unit) :表示多久聚合一次,如果没达到时间那么consumer是不会输出的,聚合后容器就空了。
  • setContainer(Supplier<? extends C> factory, BiPredicate<? super C, ? super E> queueAdder):
    第一个变量为factory,是个Supplier,获取容器用的,要求线程安全;第二个变量是缓存更新的方法BiPredicate<?
    super C, ? super E> queueAdder C为容器类型,E为元素类型
  • consumer(ThrowableConsumer<? super C, Throwable> consumer):
    表示如何消费聚合后的数据,标识我们如何去消费聚合后的数据,我这里就是简单打印。 enqueue(E element): 添加元素;
  • manuallyDoTrigger: 主动触发一次消费,通常在java进程关闭的时候调用

2、使用案例二:使用BatchConsumeBlockingQueueTrigger

/**
 * {@link BufferTrigger}基于阻塞队列的批量消费触发器实现.
 * <p>
 * 该触发器适合生产者-消费者场景,缓存容器基于{@link LinkedBlockingQueue}队列实现.
 * <p>
 * 触发策略类似Kafka linger,批处理阈值与延迟等待时间满足其一即触发消费回调.
 * @author w.vela
 */

示例:

public class BufferTriggerDemo2 {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long>batchBlocking()
             .bufferSize(50)
             .batchSize(10)
             .linger(Duration.ofSeconds(1))
             .setConsumerEx(this::consume)
             .build();

    private void consume(List<Long> nums) {
        System.out.println(nums);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        for (long j = 0; j < 60; j ++) {
            bufferTrigger.enqueue(j);
        }

        Thread.sleep(7000);
    }

  • batchBlocking():提供自带背压(back-pressure)的简单批量归并消费能力;
  • bufferSize(intbufferSize): 缓存队列的最大容量; batchSize(int size): 批处理元素的数量阈值,达到这个数量后也会进行消费
  • linger(Duration duration): 多久消费一次
  • setConsumerEx(ThrowableConsumer<?
    super List, Exception> consumer)
    : 消费函数,注入的对象为缓存队列中尚存的所有元素,非逐个元素消费;

3、两种实现方式在使用上的区别

BatchConsumeBlockingQueueTrigger每次将元素原封不动保存下来,然后一次性消费一整个列表元素。而SimpleBufferTrigger,每次添加元素都会进行计算。

以上示例摘抄该博文https://juejin.cn/post/7160569936576774181
这篇文章比较详细对请求聚合以及buffer-trigger进行了介绍

更多buffer-trigger内容可以看官方源码注释以及相应的单元测试案例
https://github.com/PhantomThief/buffer-trigger

以上就是buffer-trigger的使用教程,不过如果只是写到这边,就没啥意思了,下面就以一个实战的例子,来演示下如何实现请求聚合

案例

注: 以一个批量注册用户为例子,来演示请求聚合。案例将buffer-trigger与springboot做了一个整合。案例只列出核心代码,完整示例查看文末demo链接

1、项目中引入buffer-trigger GAV

 <dependency>
            <groupId>com.github.phantomthief</groupId>
            <artifactId>buffer-trigger</artifactId>
            <version>${buffer.trigger.version}</version>
        </dependency>

2、封装请求参数类

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DataExchange<T,R> {

    private String bizNo;
    private T request;
    private CompletableFuture<Result<R>> response;
}

3、封装buffer-trigger处理类

@RequiredArgsConstructor
public class DelegateBatchConsumerTriggerHandler<T, R> implements BatchConsumerTriggerHandler<T, R>{


    private final BufferTrigger<DataExchange<T, R>> bufferTrigger;



    @SneakyThrows
    @Override
    public Result<R> handle(T request, String bizNo) {
        DataExchange dataExchange = new DataExchange<>();
        dataExchange.setBizNo(bizNo);
        dataExchange.setRequest(request);
        CompletableFuture<Result> response = new CompletableFuture<>();
        dataExchange.setResponse(response);
        bufferTrigger.enqueue(dataExchange);
        return response.get();
    }


    @Override
    public void closeBufferTrigger() {
        // 触发该事件,关闭BufferTrigger,并将未消费的数据消费
       if(bufferTrigger != null){
           bufferTrigger.close();
       }
    }
}

4、封装buffer-trigger创建工厂

public interface BatchConsumerTriggerFactory {

    default <T,R> BatchConsumerTriggerBuilder<DataExchange<T,R>> builder(){
        return null;
    }

    default <T,R> BufferTrigger<DataExchange<T,R>> getTrigger(ThrowableConsumer<List<DataExchange<T,R>>, Exception> consumer, String bufferTriggerBizType){
        if(!support(bufferTriggerBizType)){
           return null;
        }
       return builder().setConsumerEx(consumer).build();
    }

    boolean support(String bufferTriggerBizType);

    default <T,R> BatchConsumerTriggerHandler<T,R> getTriggerHandler(ThrowableConsumer<List<DataExchange<T,R>>, Exception> consumer, String bufferTriggerBizType){
        BufferTrigger<DataExchange<T, R>> trigger = getTrigger(consumer, bufferTriggerBizType);
        return new DelegateBatchConsumerTriggerHandler<>(trigger);
    }


}

5、模拟用户注册dao

@Repository
public class UserDao {
    private final Map<Long, User> userMap = new ConcurrentHashMap<>();
    private final ThreadLocalRandom random = ThreadLocalRandom.current();
    private final LongAdder idAdder = new LongAdder();

    public User register(UserDTO userDTO){
        mockExecuteCostTime();
        return getUser(userDTO);
    }


    public List<User> batchRegister(List<UserDTO> userDTOs){
        mockExecuteCostTime();
        List<User> users = new ArrayList<>();
        userDTOs.forEach(userDTO -> users.add(getUser(userDTO)));
        return users;
    }

6、模拟用户注册service

a、 常规方式

@Service
@RequiredArgsConstructor
public class UserServiceImpl implements UserService {

    private final UserDao userDao;
    private final LongAdder count = new LongAdder();
    @Override
    public Result<User> register(UserDTO user) {
        count.increment();
        System.out.println("执行次数:" + count.sum());

        return Result.success(userDao.register(user));
    }
    }

b、 请求聚合方式

前置条件: 需在yml指定相关队列、定时器配置以及业务类别

lybgeek:
  buffer:
    trigger:
      consume-queue-trigger-properties:
        - bufferTriggerBizType: userReisgeter
          config:
            batchSize: 100
            bufferSize: 1000
            batchConsumeIntervalMills: 1000

@Service
@RequiredArgsConstructor
public class UserServiceBufferTriggerImpl implements UserService, InitializingBean, DisposableBean {
    public static final String BUFFER_TRIGGER_BIZ_TYPE = "userReisgeter";

    private final UserDao userDao;

    private final BatchConsumerTriggerFactory batchConsumerTriggerFactory;

    private BatchConsumerTriggerHandler<UserDTO,User> batchConsumerTriggerHandler;


    private final LongAdder count = new LongAdder();

    @SneakyThrows
    @Override
    public Result<User> register(UserDTO user) {
       return batchConsumerTriggerHandler.handle(user,BUFFER_TRIGGER_BIZ_TYPE + "-" + UUID.randomUUID());
    }

 
    @Override
    public void afterPropertiesSet() throws Exception {
        // key为业务属性唯一键,如果不存在业务属性唯一键,则可以取bizNo作为key,示例以username作为唯一键
        Map<String, CompletableFuture<Result<User>>> completableFutureMap = new HashMap<>();
        batchConsumerTriggerHandler = batchConsumerTriggerFactory.getTriggerHandler((ThrowableConsumer<List<DataExchange<UserDTO, User>>, Exception>) dataExchanges -> {
            List<UserDTO> userDTOs = new ArrayList<>();
            for (DataExchange<UserDTO, User> dataExchange : dataExchanges) {
                UserDTO userDTO = dataExchange.getRequest();
                completableFutureMap.put(userDTO.getUsername(),dataExchange.getResponse());
                userDTOs.add(userDTO);
            }
            count.increment();
            System.out.println("执行次数:" + count.sum());
            List<User> users = userDao.batchRegister(userDTOs);
            if(CollectionUtil.isNotEmpty(users)){
                for (User user : users) {
                    CompletableFuture<Result<User>> completableFuture = completableFutureMap.remove(user.getUsername());
                    if(completableFuture != null){
                        completableFuture.complete(Result.success(user));
                    }
                }
            }

        },BUFFER_TRIGGER_BIZ_TYPE);


    }

    @Override
    public void destroy() throws Exception {
        // 触发该事件,关闭BufferTrigger,并将未消费的数据消费
        batchConsumerTriggerHandler.closeBufferTrigger();
    }
}

7、分别开启20个线程,对常规方式以及聚合方式的service进行测试

a、 常规方式

  @Test
    public void testRegisterUserByCommon() throws IOException {
      new ConcurrentCall(20).run(()->{
          UserDTO user = UserUtil.generateUser();
          return userServiceImpl.register(user);
      });
    }

控制台输出

执行次数:1
执行次数:2
执行次数:7
执行次数:6
执行次数:10
执行次数:9
执行次数:5
执行次数:4
执行次数:11
执行次数:12
执行次数:3
执行次数:8
执行次数:17
执行次数:16
执行次数:15
执行次数:18
执行次数:14
执行次数:20
执行次数:13
执行次数:19
Result(code=200, msg=success, data=User(id=1, username=yangweize, fullname=杨伟泽, age=12, [email protected], mobile=64294835455))
Result(code=200, msg=success, data=User(id=3, username=yaojinpeng, fullname=姚晋鹏, age=13, [email protected], mobile=5381-03836251))
Result(code=200, msg=success, data=User(id=9, username=pengxiaoran, fullname=彭潇然, age=25, [email protected], mobile=903-85787160))
Result(code=200, msg=success, data=User(id=9, username=guoweize, fullname=郭伟泽, age=9, [email protected], mobile=57105382845))
Result(code=200, msg=success, data=User(id=8, username=huangjinyu, fullname=黄瑾瑜, age=29, [email protected], mobile=449-27085386))
Result(code=200, msg=success, data=User(id=6, username=renkairui, fullname=任楷瑞, age=3, [email protected], mobile=2777-67842072))
Result(code=200, msg=success, data=User(id=2, username=fuhaoran, fullname=傅昊然, age=15, [email protected], mobile=332-47390793))
Result(code=200, msg=success, data=User(id=5, username=linmingxuan, fullname=林明轩, age=27, [email protected], mobile=116-31209336))
Result(code=200, msg=success, data=User(id=5, username=shensicong, fullname=沈思聪, age=6, [email protected], mobile=0532-05033168))
Result(code=200, msg=success, data=User(id=11, username=gongtianyu, fullname=龚天宇, age=4, [email protected], mobile=9752-26976731))
Result(code=200, msg=success, data=User(id=13, username=xiongminghui, fullname=熊明辉, age=23, [email protected], mobile=0049-21709250))
Result(code=200, msg=success, data=User(id=17, username=huzhize, fullname=胡志泽, age=0, [email protected], mobile=760-85426527))
Result(code=200, msg=success, data=User(id=16, username=gaosiyuan, fullname=高思源, age=5, [email protected], mobile=42452304656))
Result(code=200, msg=success, data=User(id=13, username=mojiaxi, fullname=莫嘉熙, age=2, [email protected], mobile=7264-82263592))
Result(code=200, msg=success, data=User(id=18, username=caizimo, fullname=蔡子默, age=12, [email protected], mobile=2653-82403850))
Result(code=200, msg=success, data=User(id=10, username=wancongjian, fullname=万聪健, age=10, [email protected], mobile=954-37654583))
Result(code=200, msg=success, data=User(id=14, username=gongyuebin, fullname=龚越彬, age=0, [email protected], mobile=77884047173))
Result(code=200, msg=success, data=User(id=15, username=fenghongtao, fullname=冯鸿涛, age=2, [email protected], mobile=8832-09658213))
Result(code=200, msg=success, data=User(id=19, username=jiangyuanbo, fullname=江苑博, age=12, [email protected], mobile=2132-90700641))
Result(code=200, msg=success, data=User(id=20, username=xiaoxinlei, fullname=萧鑫磊, age=13, [email protected], mobile=02196775183))

b、 聚合请求方式

  @Test
    public void testRegisterUserByBufferTrigger() throws IOException {
        new ConcurrentCall(20).run(()->{
            UserDTO user = UserUtil.generateUser();
            return userServiceBufferTriggerImpl.register(user);
        });
    }

控制台输出

执行次数:1
Result(code=200, msg=success, data=User(id=1, username=heguo, fullname=何果, age=10, [email protected], mobile=5725-06130005))
Result(code=200, msg=success, data=User(id=7, username=houwen, fullname=侯文, age=9, [email protected], mobile=85830365362))
Result(code=200, msg=success, data=User(id=11, username=yangxiaoyu, fullname=杨笑愚, age=5, [email protected], mobile=13776594491))
Result(code=200, msg=success, data=User(id=3, username=yusimiao, fullname=余思淼, age=5, [email protected], mobile=070-18231344))
Result(code=200, msg=success, data=User(id=12, username=haotianyu, fullname=郝天宇, age=10, [email protected], mobile=42693432247))
Result(code=200, msg=success, data=User(id=14, username=wangxinpeng, fullname=汪鑫鹏, age=1, [email protected], mobile=59660609063))
Result(code=200, msg=success, data=User(id=15, username=tanzhichen, fullname=覃智宸, age=25, [email protected], mobile=075-00624335))
Result(code=200, msg=success, data=User(id=4, username=lu:haoxuan, fullname=吕皓轩, age=14, email=lu:[email protected], mobile=9548-30583153))
Result(code=200, msg=success, data=User(id=2, username=qiuyinxiang, fullname=邱胤祥, age=18, [email protected], mobile=04148786960))
Result(code=200, msg=success, data=User(id=5, username=weiweicheng, fullname=魏伟诚, age=25, [email protected], mobile=0960-77489940))
Result(code=200, msg=success, data=User(id=20, username=tanbin, fullname=谭彬, age=27, [email protected], mobile=297-57401738))
Result(code=200, msg=success, data=User(id=18, username=husiyuan, fullname=胡思远, age=24, [email protected], mobile=0809-08658163))
Result(code=200, msg=success, data=User(id=16, username=shishengrui, fullname=石晟睿, age=26, [email protected], mobile=8205-70004359))
Result(code=200, msg=success, data=User(id=17, username=lu:zihan, fullname=吕子涵, age=0, email=lu:[email protected], mobile=162-35081974))
Result(code=200, msg=success, data=User(id=19, username=xionghaoran, fullname=熊昊然, age=19, [email protected], mobile=588-09693393))
Result(code=200, msg=success, data=User(id=13, username=jiangyuebin, fullname=姜越彬, age=19, [email protected], mobile=472-74492380))
Result(code=200, msg=success, data=User(id=8, username=haoweicheng, fullname=郝伟诚, age=26, [email protected], mobile=73205366322))
Result(code=200, msg=success, data=User(id=10, username=tanhongxuan, fullname=谭鸿煊, age=18, [email protected], mobile=78536254981))
Result(code=200, msg=success, data=User(id=9, username=xielicheng, fullname=谢立诚, age=18, [email protected], mobile=4364-05053591))
Result(code=200, msg=success, data=User(id=6, username=weiluyang, fullname=韦鹭洋, age=28, [email protected], mobile=92876761170))

c、 结果分析
常规方式需要调用20次,将结果返回。聚合方式仅需调用一次,就将结果返回

总结

本文主要讲解如何进行请求聚合,请求聚合主要适用于那些需要高效、批量处理数据或消息,并且对处理延迟有一定容忍度的场景。

我们在使用请求聚合时,相关的下游最好能提供批量接口

其次BufferTrigger是单线程消费,在并发很高的场景下可能会出现消费速度跟不上生产速度,这很容易导致full gc问题。所以如果有必要的话需要使用线程池来提升消费速度

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-buffer-trigger

标签:username,code,聚合,请求,success,Result,聊聊,com,User
From: https://www.cnblogs.com/linyb-geek/p/18121753

相关文章

  • 聊聊JVM如何优化
       首先应该明确的是JVM调优不是常规手段,JVM的存在本身就是为了减轻开发对于内存管理的负担,当出现性能问题的时候第一时间考虑的是代码逻辑与设计方案,以及是否达到依赖中间件的瓶颈,最后才是针对JVM进行优化。1.JVM内存模型针对JAVA8的模型进行讨论,JVM的内存模型主要分为......
  • C# 使用Flurl http请求处理流式响应
    AI对话接口采用流式返回,使用Flurl处理返回的数据流usingFlurl;usingFlurl.Http;[HttpPost]publicasyncTask<string>GetLiushiChatLaw(){//1、请求参数,根据实际情况YourModelrequest=newYourModel();stringallStr="";stringchatLawApiUrl="ht......
  • Python 网络抓取与请求和美丽的汤被需要 javascript 阻止
    我正在尝试从网站上抓取文本。我使用简单的代码:requests.get(url_here)。我的代码直到最近才有效。现在,当我使用请求时,我收到一条消息,而不是获取网站的文本:“该网站需要启用JavaScript!您使用的浏览器不支持JavaScript,或者已关闭JavaScript。“我已验证我的浏览器确实......
  • [C++] 简单解析http请求
    #include<iostream>#include<string>#include<map>#include<vector>#include<regex>classHttpRequest{public:enumMethod{GET,POST,UNKNOWN};enumError{SUCCESS,......
  • Django5+Vue3:OA系统前后端分离项目实战-异步优化Ajax请求(12)
    Django5+Vue3系列文章前言本节开始,全文仅对会员开放。若点赞和收藏数量超过100,全文将免费开放。此项目采用Django框架的5.0.7版本进行开发。Django5.0支持的Python版本为3.10、3.11和3.12。OA系统系列文章将持续更新,直至项目的Docker部署阶段。专栏链接:......
  • Flask 应用程序的 POST 请求出现 405 method not allowed 错误
    我有一个简单的Web应用程序,可以使用以下代码向选定的受访者发送消息(使用TwilioAPI):app.pyclient=Client(account_sid,auth_token)@app.route('/')defindex():returnrender_template('index.html')@app.route('/send_sms',methods=['POST......
  • 为什么有些POST请求会发送两次?
    今天在浏览博客时,我发现了一个有趣的问题:“为什么POST请求会发送两次?”之前我从未注意到这个问题。我的第一反应是,是否是因为防抖处理不当导致POST请求被发送两次。......
  • 【转载】科研写作入门 —— 聊聊Science Research Writing for non-native Speakers o
    原地址:https://zhuanlan.zhihu.com/p/623882027平行侠:今天我们聊一聊ScienceResearchWritingfornon-nativeSpeakersofEnglish这本书,我博士毕业发的TIP论文就是看了这本书之后才写出来的,我太爱这本书了,请你给我们介绍一下吧。AI:非常高兴听到您对这本书的好评!《S......
  • aiohttp 存在超时问题,但请求没有超时问题
    我正在尝试使用两种不同的方法在Python中获取网页:requests和aiohttprequests方法工作正常,但aiohttp方法会导致超时。代码如下:importasyncioimportaiohttpimportrequestsheaders={'User-Agent':'Mozilla/5.0(WindowsNT10.0;Win64;x6......
  • Python 请求 POST 请求与 websockets 库一起使用时挂起
    我使用Python中的requests库发送POST请求,同时维护与websockets库的WebSocket连接:importasyncioimportrequestsimportwebsocketsasyncdefwebsocket_handler(uri):asyncwithwebsockets.connect(uri)aswebsocket:whileTrue:me......