以下代码片段为模拟一个这样的操作:
在多台服务器上下载文件列表内的文件。其中,获取服务器、获取文件列表、在服务器执行下载操作均为阻塞方法。
import cn.hutool.core.collection.CollUtil;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
@QuarkusTest
public class MutinyExample {
private static final Logger log = LoggerFactory.getLogger(MutinyExample.class);
@Test
void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
execute()
// 为方便测试,收到完成事件的时候做些处理
.onCompletion().call(() -> {
log.info("执行完成了啊");
// 收到完成事件就可以释放了
latch.countDown();
return Uni.createFrom().voidItem();
})
// 一定要订阅,订阅才能触发执行
.subscribe().with(log::info);
// 阻塞住,防止测试退出看不到效果
latch.await();
}
public Multi<String> execute() {
// 响应式获取文件列表,这里是把阻塞操作变成响应式操作
Uni<List<String>> fileListUni = Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> getFileList()));
// 响应式获取服务器列表
Uni<List<String>> clusterListUni = Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> getClusterList()));
return Uni
// 将两个数据流合并成一个数据流
.combine().all().unis(fileListUni, clusterListUni)
// 放到tuple里面
.asTuple()
// 我想在这里遍历服务器执行命令,所以我要把Uni转成一个Multi<服务器>这种格式
.onItem().transformToMulti((Function<Tuple2<List<String>, List<String>>, Multi<String>>) objects -> {
List<String> fileList = objects.getItem1();
List<String> clusterList = objects.getItem2();
// 创建Multi<服务器>
return Multi.createFrom().items(clusterList.stream())
// 异步执行一个阻塞操作,遍历服务器执行一段命令
.onItem().call(server -> {
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
log.info("执行阻塞操作");
// 在这里执行命令操作
log.info("{} 下载文件: {}", server, fileList);
// 模拟阻塞
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("{} 下载文件完成", server);
});
// 这里返回什么其实并不重要,call并不关心,call只要知道操作执行完了就行了
return Uni.createFrom().completionStage(future);
});
});
}
/**
* 获取文件列表
* 这是一个阻塞操作
*/
private List<String> getFileList() {
log.info("获取文件列表");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("获取文件列表完成");
return CollUtil.newArrayList("file1", "file2");
}
/**
* 获取服务器列表
* 这是一个阻塞操作
*/
private List<String> getClusterList() {
log.info("获取服务器列表");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("获取服务器列表完成");
return CollUtil.newArrayList("cluster1", "cluster2");
}
}
输出结果
2023-11-17 18:31:58,805 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 获取文件列表
2023-11-17 18:31:58,805 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) 获取服务器列表
2023-11-17 18:31:59,808 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) 获取服务器列表完成
2023-11-17 18:31:59,810 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 获取文件列表完成
2023-11-17 18:31:59,818 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) 执行阻塞操作
2023-11-17 18:31:59,821 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) cluster1 下载文件: [file1, file2]
2023-11-17 18:32:00,823 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) cluster1 下载文件完成
2023-11-17 18:32:00,824 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-6) cluster1
2023-11-17 18:32:00,825 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 执行阻塞操作
2023-11-17 18:32:00,825 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) cluster2 下载文件: [file1, file2]
2023-11-17 18:32:01,848 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) cluster2 下载文件完成
2023-11-17 18:32:01,869 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) cluster2
2023-11-17 18:32:01,869 INFO [com.dg.df.rda.dep.MutinyExample] (ForkJoinPool.commonPool-worker-7) 执行完成了啊
标签:11,MutinyExample,df,dg,ForkJoinPool,示例,Mutiny,Uni,commonPool
From: https://www.cnblogs.com/xiaojiluben/p/17839472.html