首页 > 其他分享 >Mutiny Uni阻塞操作变响应式的方法示例

Mutiny Uni阻塞操作变响应式的方法示例

时间:2023-11-17 18:44:05浏览次数:41  
标签:11 MutinyExample df dg ForkJoinPool 示例 Mutiny Uni commonPool

以下代码片段为模拟一个这样的操作:
在多台服务器上下载文件列表内的文件。其中,获取服务器、获取文件列表、在服务器执行下载操作均为阻塞方法。



import cn.hutool.core.collection.CollUtil;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

@QuarkusTest
public class MutinyExample {

    private static final Logger log = LoggerFactory.getLogger(MutinyExample.class);


    @Test
    void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        execute()
                // 为方便测试,收到完成事件的时候做些处理
                .onCompletion().call(() -> {
                    log.info("执行完成了啊");
                    // 收到完成事件就可以释放了
                    latch.countDown();
                    return Uni.createFrom().voidItem();
                })
                // 一定要订阅,订阅才能触发执行
                .subscribe().with(log::info);

        // 阻塞住,防止测试退出看不到效果
        latch.await();
    }

    public Multi<String> execute() {
        // 响应式获取文件列表,这里是把阻塞操作变成响应式操作
        Uni<List<String>> fileListUni = Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> getFileList()));
        // 响应式获取服务器列表
        Uni<List<String>> clusterListUni = Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> getClusterList()));


        return Uni
                // 将两个数据流合并成一个数据流
                .combine().all().unis(fileListUni, clusterListUni)
                // 放到tuple里面
                .asTuple()
                // 我想在这里遍历服务器执行命令,所以我要把Uni转成一个Multi<服务器>这种格式
                .onItem().transformToMulti((Function<Tuple2<List<String>, List<String>>, Multi<String>>) objects -> {
                    List<String> fileList = objects.getItem1();
                    List<String> clusterList = objects.getItem2();

                    // 创建Multi<服务器>
                    return Multi.createFrom().items(clusterList.stream())
                            // 异步执行一个阻塞操作,遍历服务器执行一段命令
                            .onItem().call(server -> {
                                CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
                                    log.info("执行阻塞操作");
                                    // 在这里执行命令操作
                                    log.info("{} 下载文件: {}", server, fileList);
                                    // 模拟阻塞
                                    try {
                                        Thread.sleep(1000);
                                    } catch (InterruptedException e) {
                                        throw new RuntimeException(e);
                                    }
                                    log.info("{} 下载文件完成", server);
                                });

                                // 这里返回什么其实并不重要,call并不关心,call只要知道操作执行完了就行了
                                return Uni.createFrom().completionStage(future);
                            });
                });

    }

    /**
     * 获取文件列表
     * 这是一个阻塞操作
     */
    private List<String> getFileList() {
        log.info("获取文件列表");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("获取文件列表完成");
        return CollUtil.newArrayList("file1", "file2");
    }

    /**
     * 获取服务器列表
     * 这是一个阻塞操作
     */
    private List<String> getClusterList() {
        log.info("获取服务器列表");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("获取服务器列表完成");
        return CollUtil.newArrayList("cluster1", "cluster2");
    }
}


输出结果

2023-11-17 18:31:58,805 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 获取文件列表
2023-11-17 18:31:58,805 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) 获取服务器列表
2023-11-17 18:31:59,808 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) 获取服务器列表完成
2023-11-17 18:31:59,810 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 获取文件列表完成
2023-11-17 18:31:59,818 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) 执行阻塞操作
2023-11-17 18:31:59,821 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) cluster1 下载文件: [file1, file2]
2023-11-17 18:32:00,823 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) cluster1 下载文件完成
2023-11-17 18:32:00,824 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) cluster1
2023-11-17 18:32:00,825 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 执行阻塞操作
2023-11-17 18:32:00,825 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) cluster2 下载文件: [file1, file2]
2023-11-17 18:32:01,848 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) cluster2 下载文件完成
2023-11-17 18:32:01,869 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) cluster2
2023-11-17 18:32:01,869 INFO  [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 执行完成了啊

标签:11,MutinyExample,df,dg,ForkJoinPool,示例,Mutiny,Uni,commonPool
From: https://www.cnblogs.com/xiaojiluben/p/17839472.html

相关文章

  • Java Junit单元测试(入门必看篇)
    Hii,mJinXiang⭐前言 ⭐本篇文章主要介绍单元测试工具Junit使用以及部分理论知识......
  • 使用Linux命令sort及uniq对文件或屏幕输出进行分组统计
    sortdemo.txt|uniq-c|sort-rn|head-3在日常Linux操作常常需要对一些文件或屏幕数次中重复的字段进行分组统计。实现的方法非常简单,核心命令为:sort|uniq--c|sort-rn。sort:对指定列进行排序,使该列相同的字段排练到一起uniq-c:uniq命令用于检查及删除文本文件......
  • Unity3D:ECS架构详解
    Unity3D是目前游戏开发领域最为流行的引擎之一,而Unity3DECS架构则是在Unity3D引擎的基础上,针对游戏开发中的实体组件系统进行优化和改进而设计的一种架构。本文将详细介绍Unity3DECS架构的技术细节和代码实现。对惹,这里有一个游戏开发交流小组,希望大家可以点击进来一起交流一下开......
  • 出现UnicodeDecodeError: 'utf-8' codec can't decode byte 0xbc in position 2: inva
    直接在代码第一行写下这段代码#-*-coding:utf-8-*-为什么这个有注释符号还是可以起作用?在Python中,`#-*-coding:utf-8-*-`这行代码并不是注释,而是一个特殊的声明,称为“编码声明”(encodingdeclaration)。它告诉Python解释器在载入源文件时使用UTF-8编码来解析源......
  • Communication Setup中VCDL与Python交互
     ApplicationMoudles基础代码[email protected]_scriptclassLinkToSigFile:#Calledbeforemeasurementstarttoperformnecessaryinitializations,#e.g.tocreateobjects.Duringmeasurement,fewadditionalobjects......
  • SQL(Structured Query Language)简介和常见 SQL 命令示例
    简介SQL(StructuredQueryLanguage)是一种用于访问和操作关系型数据库的标准语言。它是一个功能强大的语言,用于执行各种数据库操作,包括检索数据、插入新记录、更新记录、删除记录、创建数据库、创建新表、设置权限以及执行存储过程和视图等。以下是SQL的一些重要方面:SQL的目的......
  • SQL(Structured Query Language)简介和常见 SQL 命令示例
    简介SQL(StructuredQueryLanguage)是一种用于访问和操作关系型数据库的标准语言。它是一个功能强大的语言,用于执行各种数据库操作,包括检索数据、插入新记录、更新记录、删除记录、创建数据库、创建新表、设置权限以及执行存储过程和视图等。以下是SQL的一些重要方面:SQL的目......
  • Unity之贴图混合
    有如下几种方式:1.CPU端逐像素根据alpha通道进行叠加1publicvoidMergeTexture(Texture2Dtt1,Texture2Dtt2,intoffsetX,intoffsetY)2{3Texture2DnewTex=newTexture2D(tt1.width,tt1.height,TextureFormat.ARGB32,false);4......
  • The 2020 ICPC Asia Shenyang Regional Programming Contest M. United in Stormwind
    Preface先补一下这周一队友VP的ICPC2020沈阳,这场由于我在补作业+晚上有大物实验,因此只参与了中间一个多小时,纯口胡了几个简单题因为我没怎么参与所以过的其它题就不写补题+写博客了,毕竟队友会等于我会那么就主要把我比赛时看了但没啥思路的M补了,AI祁神好像在补那我就不管了,后面......
  • uniapp微信小程序图片裁剪插件,支持自定义尺寸、定点等比例缩放、拖动、图片翻转、剪切
    uniapp微信小程序图片裁剪插件,支持自定义尺寸、定点等比例缩放、拖动、图片翻转、剪切圆形/圆角图片、定制样式,功能多性能高体验好注释全。1.效果预览:      5.引入插件项目代码:Homilier/qf-image-cropper·GitCode使用HBuilderX导入项目:图片裁剪插件-DCloud......