首页 > 编程语言 >java协程线程之虚拟线程

java协程线程之虚拟线程

时间:2023-07-19 13:46:29浏览次数:44  
标签:runnable 协程 java System atomicInteger 线程 new nanoTime

前言

众所周知,java 是没有协程线程的,在我们如此熟知的jdk 1.8时代,大佬们想出来的办法就是异步io,甚至用并行的stream流来实现,高并发也好,缩短事件处理时间也好;大家都在想着自己认为更好的实现方式;

在来说说吧,我为什么会在今天研究这个破b玩意儿呢,

这事情还的从一个月前的版本维护说起,

目前公司游戏运营的算中规中矩吧,日新增和日活跃用户基本保持在1w,2.5w样子;

大概1-2周会有一次版本更新,需要停服维护的,

我想大部分做游戏的同僚可能都知道,游戏架构里面包含一个登录服这么一个环节,用于对账号管理以及和sdk平台做登录二次验证;

我们的问题也就出在了这sdk二次登录验证环境;

 从这个截图中不难看出,我在向sdk服务器进行验证的时候http请求耗时,一个请求多长达400ms,按照这个逻辑,一个线程一秒钟也只能是2个登录;

然后面对停服维护阶段,玩家疯狂的尝试登录,导致登录服务器直接积压了30万个登录请求等待处理;

在寻求方案的时候,看到了http请求池化方案,目前已经大线程池(这里是本人自定义线程池)和http池化(基于 Apache CloseableHttpClient)处理方案 因为平台是jdk11的

在寻求方案同时发现了jdk19开放的预览版新功能虚拟线程;翻阅了一些资料,就像这虚拟线程能不能为我带来更好性能体验,让现有的系统,吞吐量更上一层楼;

一下测试代码用的是jdk20测试

 构建虚拟线程

 第一步我们需要先创建虚拟线程,才能去理解什么是虚拟线程

1     public static void main(String[] args) throws Exception {
2 
3         Thread.startVirtualThread(() -> {
4             System.out.println(Thread.currentThread().toString());
5         });
6 
7         Thread.sleep(3000);
8     }

 

 这就正确的启动了一个虚拟线程;从线程明明输出看着是不是有点眼熟,是不是跟stream的并行流很相似;

接下来我们看看虚拟线程的运行是怎么回事,

 1    public static void main(String[] args) throws Exception {
 2 
 3         Thread.startVirtualThread(() -> {
 4             try {
 5                 Thread.sleep(5000);
 6             } catch (InterruptedException e) {
 7                 throw new RuntimeException(e);
 8             }
 9             System.out.println(Thread.currentThread().toString());
10         });
11 
12         Thread.startVirtualThread(() -> {
13             try {
14                 Thread.sleep(5000);
15             } catch (InterruptedException e) {
16                 throw new RuntimeException(e);
17             }
18             System.out.println(Thread.currentThread().toString());
19         });
20         Thread.startVirtualThread(() -> {
21             try {
22                 Thread.sleep(5000);
23             } catch (InterruptedException e) {
24                 throw new RuntimeException(e);
25             }
26             System.out.println(Thread.currentThread().toString());
27         });
28         Thread.startVirtualThread(() -> {
29             try {
30                 Thread.sleep(5000);
31             } catch (InterruptedException e) {
32                 throw new RuntimeException(e);
33             }
34             System.out.println(Thread.currentThread().toString());
35         });
36         Thread.startVirtualThread(() -> {
37             try {
38                 Thread.sleep(5000);
39             } catch (InterruptedException e) {
40                 throw new RuntimeException(e);
41             }
42             System.out.println(Thread.currentThread().toString());
43         });
44         Thread.sleep(3000);
45     }
View Code

我们多new几个虚拟线程来看看监控

 看到了吧,实际上你new的虚拟线程,其实是被当成了一个任务丢到了线程池里面在运行;

 在翻阅了现有的代码逻辑还不能定义这个底部线程池,只能使用默认的;

当然目前是预览版,不确定之后会不会可以自定义实现,stream流一样,可以定义它并行数量;

 

线程池对比

测试用例1 

 1     @Test
 2     public void r() {
 3         t1();
 4         t2();
 5     }
 6 
 7     public void t1() {
 8         AtomicInteger atomicInteger = new AtomicInteger(100);
 9         try (var executor = Executors.newFixedThreadPool(10)) {
10             long nanoTime = System.nanoTime();
11             for (int i = 0; i < 100; i++) {
12                 executor.execute(() -> {
13                     try {
14                         Thread.sleep(50);
15                     } catch (InterruptedException e) {
16                         throw new RuntimeException(e);
17                     }
18                     atomicInteger.decrementAndGet();
19                 });
20             }
21             while (atomicInteger.get() > 0) {}
22             System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
23         }
24     }
25 
26     public void t2() {
27         AtomicInteger atomicInteger = new AtomicInteger(100);
28         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
29             long nanoTime = System.nanoTime();
30             for (int i = 0; i < 100; i++) {
31                 executor.execute(() -> {
32                     try {
33                         Thread.sleep(50);
34                     } catch (InterruptedException e) {
35                         throw new RuntimeException(e);
36                     }
37                     atomicInteger.decrementAndGet();
38                 });
39             }
40             while (atomicInteger.get() > 0) {}
41             System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
42         }
43     }
View Code

 通过这段测试代码对比,总任务耗时,显而易见性能;

测试用例2

 1     public void t2p() {
 2         Runnable runnable = () -> {
 3             long g = 0;
 4             for (int i = 0; i < 10000; i++) {
 5                 for (int j = 0; j < 10000; j++) {
 6                     for (int k = 0; k < 100; k++) {
 7                         g++;
 8                     }
 9                 }
10             }
11         };
12         AtomicInteger atomicInteger = new AtomicInteger(100);
13         try (var executor = Executors.newFixedThreadPool(10)) {
14             long nanoTime = System.nanoTime();
15             for (int i = 0; i < 100; i++) {
16                 executor.execute(() -> {
17                     runnable.run();
18                     atomicInteger.decrementAndGet();
19                 });
20             }
21             while (atomicInteger.get() > 0) {}
22             System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
23         }
24     }
25 
26     public void t2v() {
27         Runnable runnable = () -> {
28             long g = 0;
29             for (int i = 0; i < 10000; i++) {
30                 for (int j = 0; j < 10000; j++) {
31                     for (int k = 0; k < 100; k++) {
32                         g++;
33                     }
34                 }
35             }
36         };
37         AtomicInteger atomicInteger = new AtomicInteger(100);
38         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
39             long nanoTime = System.nanoTime();
40             for (int i = 0; i < 100; i++) {
41                 executor.execute(() -> {
42                     runnable.run();
43                     atomicInteger.decrementAndGet();
44                 });
45             }
46             while (atomicInteger.get() > 0) {}
47             System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
48         }
49     }

 通过测试用例2不难看出,虚拟线程已经不占优势;

这是为什么呢?

总结

平台线程我就不过多描述因为大家都知道,网上的描述也特别多;

虚拟线程,其实我们更多可以可以考虑他只是一个任务,异步的任务;

区别在于,平台线程受制于cpu,如果你执行任务很耗时或者比如网络io等挂起等待,那么这个cpu也会一直挂起等待无法处理其他事情;

虚拟线程是异步任务凌驾于平台线程之上,也就是说,当你的虚拟线程等待挂起的时候,平台线程就去执行其他任务(其他虚拟线程)去了

我们通过上面测试用例可以这样理解,

 

用例1,通常我们的RPC服务或者SDK跟我开通SDK二次验证大部分时间处于等待挂起业务,这时候虚拟线程的作用就会非常大,他可以发起大量的验证请求,等待回答;我们通常定义的IO密集型应用;

 

用例2,属于计算型的,它会一直占用cpu时间片,不会腾出cpu去执行其他事件;我们通常说cpu密集型应用不太适用虚拟线程;

 

目前虚拟线程的执行依赖于底层线程池,我们无法自主控制它,所以不是很建议使用

关于虚拟线程的描述或者定义我就不在过多的去阐述,

我只说一下它运行的逻辑吧,

1,在不同时间段一个虚拟线程可以由不同的平台线程调度,也可以由一个平台线程调度,平台线程=系统线程=cpu

2,在不同时间段一个平台线程在可以调度不同的虚拟线程,也可以反复调度一个虚拟线程

3,在同一时间段,一个平台线程只能调用一个虚拟线程,一个虚拟线程只能由一个平台线程调度

换言之,其实虚拟线程可以看成一个task,你可以new很多的task,至于他什么时候被执行,就看你的工人(cpu)什么时候有空,

 

  1 package code.threading;
  2 
  3 import org.junit.Test;
  4 
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.atomic.AtomicBoolean;
  9 import java.util.concurrent.atomic.AtomicInteger;
 10 
 11 /**
 12  * 线程测试
 13  *
 14  * @author: Troy.Chen(無心道, 15388152619)
 15  * @version: 2023-05-29 21:31
 16  **/
 17 public class ThreadCode {
 18 
 19     public static void main(String[] args) throws Exception {
 20 
 21     }
 22 
 23     @Test
 24     public void s() throws Exception {
 25 
 26         Runnable runnable = () -> {
 27             long nanoTime = System.nanoTime();
 28             long g = 0;
 29             for (int i = 0; i < 10000; i++) {
 30                 for (int j = 0; j < 10000; j++) {
 31                     for (int k = 0; k < 100; k++) {
 32                         g++;
 33                     }
 34                 }
 35             }
 36             Thread thread = Thread.currentThread();
 37             System.out.println(g + " - " + thread.isVirtual() + " - " + thread.threadId() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
 38         };
 39 
 40         List<VirtualThread> ts = new ArrayList<>();
 41         ts.add(new VirtualThread(runnable));
 42         ts.add(new VirtualThread(runnable));
 43         ts.add(new VirtualThread(runnable));
 44         ts.add(new VirtualThread(runnable));
 45         ts.add(new VirtualThread(runnable));
 46         ts.add(new VirtualThread(runnable));
 47         ts.add(new VirtualThread(runnable));
 48         ts.add(new VirtualThread(runnable));
 49         ts.add(new VirtualThread(runnable));
 50         ts.add(new VirtualThread(runnable));
 51         ts.add(new VirtualThread(runnable));
 52         ts.add(new VirtualThread(runnable));
 53         ts.add(new VirtualThread(runnable));
 54         ts.add(new VirtualThread(runnable));
 55         ts.add(new VirtualThread(runnable));
 56         ts.add(new VirtualThread(runnable));
 57         ts.add(new VirtualThread(runnable));
 58         ts.add(new VirtualThread(runnable));
 59         ts.add(new VirtualThread(runnable));
 60         ts.add(new VirtualThread(runnable));
 61         for (VirtualThread t : ts) {
 62             t.shutdown();
 63         }
 64         for (VirtualThread t : ts) {
 65             t.join();
 66         }
 67     }
 68 
 69     public static class VirtualThread implements Runnable {
 70 
 71         /*虚拟线程构建器*/
 72         static final Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual().name("v-", 1);
 73 
 74         AtomicBoolean shutdown = new AtomicBoolean();
 75         Thread _thread;
 76         Runnable runnable;
 77 
 78         public VirtualThread(Runnable runnable) {
 79             this.runnable = runnable;
 80             _thread = ofVirtual.start(this);
 81         }
 82 
 83         @Override public void run() {
 84             do {
 85                 try {
 86                     try {
 87                         this.runnable.run();
 88                     } catch (Throwable e) {
 89                         e.printStackTrace();
 90                     }
 91                 } catch (Throwable throwable) {}
 92             } while (!shutdown.get());
 93             System.out.println("虚拟线程退出 " + _thread.isVirtual() + " - " + _thread.threadId() + " - " + _thread.getName());
 94         }
 95 
 96         public void shutdown() {
 97             shutdown.lazySet(true);
 98         }
 99 
100         public void join() throws InterruptedException {
101             _thread.join();
102         }
103     }
104 
105     @Test
106     public void r() {
107         t2p();
108         t2v();
109     }
110 
111     public void t1p() {
112         AtomicInteger atomicInteger = new AtomicInteger(100);
113         try (var executor = Executors.newFixedThreadPool(10)) {
114             long nanoTime = System.nanoTime();
115             for (int i = 0; i < 100; i++) {
116                 executor.execute(() -> {
117                     try {
118                         Thread.sleep(50);
119                     } catch (InterruptedException e) {
120                         throw new RuntimeException(e);
121                     }
122                     atomicInteger.decrementAndGet();
123                 });
124             }
125             while (atomicInteger.get() > 0) {}
126             System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
127         }
128     }
129 
130     public void t1v() {
131         AtomicInteger atomicInteger = new AtomicInteger(100);
132         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
133             long nanoTime = System.nanoTime();
134             for (int i = 0; i < 100; i++) {
135                 executor.execute(() -> {
136                     try {
137                         Thread.sleep(50);
138                     } catch (InterruptedException e) {
139                         throw new RuntimeException(e);
140                     }
141                     atomicInteger.decrementAndGet();
142                 });
143             }
144             while (atomicInteger.get() > 0) {}
145             System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
146         }
147     }
148 
149     public void t2p() {
150         Runnable runnable = () -> {
151             long g = 0;
152             for (int i = 0; i < 10000; i++) {
153                 for (int j = 0; j < 10000; j++) {
154                     for (int k = 0; k < 100; k++) {
155                         g++;
156                     }
157                 }
158             }
159         };
160         AtomicInteger atomicInteger = new AtomicInteger(100);
161         try (var executor = Executors.newFixedThreadPool(10)) {
162             long nanoTime = System.nanoTime();
163             for (int i = 0; i < 100; i++) {
164                 executor.execute(() -> {
165                     runnable.run();
166                     atomicInteger.decrementAndGet();
167                 });
168             }
169             while (atomicInteger.get() > 0) {}
170             System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
171         }
172     }
173 
174     public void t2v() {
175         Runnable runnable = () -> {
176             long g = 0;
177             for (int i = 0; i < 10000; i++) {
178                 for (int j = 0; j < 10000; j++) {
179                     for (int k = 0; k < 100; k++) {
180                         g++;
181                     }
182                 }
183             }
184         };
185         AtomicInteger atomicInteger = new AtomicInteger(100);
186         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
187             long nanoTime = System.nanoTime();
188             for (int i = 0; i < 100; i++) {
189                 executor.execute(() -> {
190                     runnable.run();
191                     atomicInteger.decrementAndGet();
192                 });
193             }
194             while (atomicInteger.get() > 0) {}
195             System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
196         }
197     }
198 }
View Code

附加一段全部测试代码

  

标签:runnable,协程,java,System,atomicInteger,线程,new,nanoTime
From: https://www.cnblogs.com/shizuchengxuyuan/p/17565269.html

相关文章

  • 影响 Java 程序的性能的因素和性能指标
    有哪些因素会影响Java程序的性能?执行速度:程序的反应是否迅速,响应时间是否足够短内存分配:内存分配是否合理,是否过多地消耗内存或者存在泄漏启动时间:程序从运行到可以正常处理业务需要花费多少时间负载承受能力:当系统压力上升时,系统的执行速度、响应时间的上升曲线是否平缓......
  • JavaScript - 支持word上传的富文本编辑器
    ​ 在之前在工作中遇到在富文本编辑器中粘贴图片不能展示的问题,于是各种网上扒拉,终于找到解决方案,在这里感谢一下知乎中众大神以及TheViper。通过知乎提供的思路找到粘贴的原理,通过TheViper找到粘贴图片的方法。其原理为一下步骤:监听粘贴事件;【用于插入图片】获取光标位置;【......
  • 浅谈Java容器
    Java容器容器类是Java以类库的形式供用户开发程序时可直接使用的各种数据结构。所谓数据结构就是以某种方式将数据组织在一起,并存储在计算机中。数据结构不仅可以存储数据,还支持访问和处理数据的操作。在面向对象思想里,一种数据结构被认为是一个容器。数组是一种简单的数据结构,......
  • Java基础入门
    一、注释方式标识符单行注释//多行注释/**/文档注释/***/二、基础1、进制进制前缀二进制0b八进制0十进制无十六进制0x2、数据类型typevarName[=value][{,varName[=value]}];bytenum1=127;shortnum2=32767;intn......
  • Java高并发之CyclicBarrier简介(转)
    原文:https://juejin.cn/post/7209617649885184058作者:xindoo来源:稀土掘金  Java中的CyclicBarrier是一种同步工具,它可以让多个线程在一个屏障处等待,直到所有线程都到达该屏障处后,才能继续执行。CyclicBarrier可以用于协调多个线程的执行,以便它们可以在某个点上同步执行......
  • JavaScript 的优雅编程技巧:Singleton Pattern
    JavaScript的优雅编程技巧:SingletonPattern定义单例模式:保证一个类仅有一个实例,并提供一个访问的全局访问点。特点仅有一个实例对象全局都可访问该实例主动实例化延迟实例化类似单例模式的使用实践jQuery,lodash,moment....电商中的购物车(因为一个用户只有一......
  • javaSe笔试题
    1 ==和equals区别 ......
  • 【Javascript】数组扩展方法:根据key重新分组
    1//数组扩展:根据key重新分组2//field:按什么字段分组3Array.prototype.GroupByKey=function(field)4{5varoriginalArr=this6lettempArr=[]7letresultData=[]8for(leti=0;i<originalArr.length;i++)9{10......
  • PerfView专题 (第十四篇): 洞察那些 C# 代码中的短命线程
    一:背景1.讲故事这篇文章源自于分析一些疑难dump的思考而产生的灵感,在dump分析中经常要寻找的一个答案就是如何找到死亡线程的生前都做了一些什么?参考如下输出:0:001>!tThreadCount:22UnstartedThread:0BackgroundThread:1PendingThread:0DeadThread:......
  • JavaScript 笔记(二)事件循环机制
    一、事件循环机制1.定义:事件循环是JavaScript中一种重要的异步执行机制。2.作用:管理和协调各种异步任务的执行顺序,保证JavaScript代码的执行顺序和预期一致。3.组成部分:3.1主线程(调用栈):执行任务;3.2任务队列:存放异步任务;3.3事件循环......