在之前的文章中,简单描述了将spring boot 2.x升级spring boot 3.1的版本的过程。
本文将简单介绍如何在spring 中引入虚拟线程,在文章最后会放上一些关于虚拟线程的官方参考资料。
JDK 22会引来重要特性,Virtual Threads也就是协程功能。
与主流的async、await方案(C#、JS等语言)相比,Java属于stackfull coroutine有栈协程。
Java的虚拟线程API和旧版线程有良好的兼容性,升级成本非常低,还引入了结构化并发等多种工具类辅助开发人员更好的编程。
(再也不用羡慕go语言的go func(){} 了,现在Java也能只需要 Thread.startVirtualThread(() -> { }) )
在Spring中使用,你至少需要Spring 6以及版本。
如果是Maven构建的项目,需要加入预览的编译选项。虚拟线程是JDK 22的才会正式发布,大概是在今年(2023年)9月发布。
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>21</source> <target>21</target> <compilerArgs> --enable-preview </compilerArgs> </configuration> </plugin> </plugins> </build>
增加配置类:
@EnableAsync @Configurationpublic class ThreadConfig { @Bean public AsyncTaskExecutor applicationTaskExecutor() { return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor()); } @Bean public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() { return protocolHandler -> { protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); }; } }
前往控制器测试,打印出来就是虚拟线程了:
@GetMapping("/name") public String getThreadName() {
//VirtualThread[#171]/runnable@ForkJoinPool-1-worker-4 return Thread.currentThread().toString(); } }
试一下JDK新提供的结构化并发工具包
package com.mycode; import jdk.incubator.concurrent.StructuredTaskScope; import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.concurrent.*; import java.util.stream.Collectors; /** * https://download.java.net/java/early_access/loom/docs/api/java.base/java/lang/Thread.html * https://download.java.net/java/early_access/loom/docs/api/jdk.incubator.concurrent/jdk/incubator/concurrent/StructuredTaskScope.html */ public class StructuredConcurrency { static int taskNo = 10; static Callable<Integer> task = () -> { System.out.println(Thread.currentThread()); Thread.sleep(Duration.ofSeconds(1)); return 42; }; static void simpleScope() throws InterruptedException { try (var scope = new StructuredTaskScope<>()) { scope.fork(task); scope.join(); } } static void nestedScope() throws InterruptedException { try (var scopeOne = new StructuredTaskScope<>()) { Collections.nCopies(10, task).forEach(scopeOne::fork); try (var scopeTwo = new StructuredTaskScope<>()) { var scopeTwoTasks = Collections.nCopies(taskNo, task) .stream().map(scopeTwo::fork).toList(); scopeTwo.joinUntil(Instant.now().plusSeconds((long) (scopeTwoTasks.size() * 1.5))); } catch (TimeoutException e) { throw new RuntimeException(e); } scopeOne.join(); } } static void complexScope() { try (var scopeGlobal = new StructuredTaskScope<>("platform", Thread.ofPlatform().factory())) { var foo = scopeGlobal.fork(() -> { try (var scopeOne = new StructuredTaskScope.ShutdownOnFailure()) { var listOfFutures = Collections.nCopies(taskNo, task) .stream().map(scopeOne::fork).toList(); scopeOne.joinUntil(Instant.now().plusSeconds((long) (listOfFutures.size() * 1.5))); var results = listOfFutures.stream().map(Future::resultNow).toList(); System.out.println("completed in scopeOne"); return results; } catch (Throwable e) { throw new RuntimeException(e); } }); while (!foo.isDone()) Thread.sleep(10); var bar = scopeGlobal.fork(() -> foo.resultNow().parallelStream() .collect(Collectors.groupingBy(Integer::valueOf, Collectors.counting()))); scopeGlobal.join(); bar.resultNow(); } catch (InterruptedException e) { throw new RuntimeException(e); } } static void unboundForkInScope() { try (var scopeJoinUntil = new StructuredTaskScope<>()) { Collections.nCopies(15, task) .parallelStream().forEach(scopeJoinUntil::fork); scopeJoinUntil.joinUntil(Instant.now().plusSeconds(taskNo)); } catch (WrongThreadException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } static void boundStream() throws InterruptedException { try (var scope = new StructuredTaskScope<Integer>()) { for (Callable<Integer> integerCallable : Collections.nCopies(100, task)) { scope.fork(integerCallable); } scope.join(); } } static void _streamInScope(boolean runParallelStream) { try (var scopeJoinUntil = new StructuredTaskScope<>()) { scopeJoinUntil.fork(() -> { var stream = Collections.nCopies(taskNo, task).stream(); if (runParallelStream) { stream = stream.parallel(); } return stream.map(scopeJoinUntil::fork).toList(); }); scopeJoinUntil.join(); } catch (WrongThreadException | InterruptedException e) { e.printStackTrace(); } } static void syncStreamInScope() { _streamInScope(false); } static void parallelStreamInScope() { _streamInScope(true); } public static void main(String[] args) throws InterruptedException { nestedScope(); simpleScope(); complexScope(); //unboundForkInScope(); Runnable runnable = () -> { System.out.println("run"); }; // Start a daemon thread to run a task Thread thread1 = Thread.ofPlatform().daemon().start(runnable); // Create an unstarted thread with name "duke", its start() method // must be invoked to schedule it to execute. Thread thread2 = Thread.ofPlatform().name("duke").unstarted(runnable); // A ThreadFactory that creates daemon threads named "worker-0", "worker-1", ... ThreadFactory factory = Thread.ofPlatform().daemon().name("worker-", 0).factory(); // Start a virtual thread to run a task Thread thread3 = Thread.ofVirtual().start(runnable); // A ThreadFactory that creates virtual threads ThreadFactory factory1 = Thread.ofVirtual().factory(); Thread.Builder.OfVirtual virtualThreadBuilder = Thread.ofVirtual() // 协程名称 .name("fiber-1") // start > 0的情况下会覆盖name属性配置 .name("fiber-", 1L) // 是否启用ThreadLocal .allowSetThreadLocals(false) // 是否启用InheritableThreadLocal .inheritInheritableThreadLocals(false) // 设置未捕获异常处理器 .uncaughtExceptionHandler((t, e) -> { }); Thread.Builder.OfPlatform platformThreadBuilder = Thread.ofPlatform() .daemon(true) .group(Thread.currentThread().getThreadGroup()) .name("thread-1") .name("thread-", 1L) .allowSetThreadLocals(false) .inheritInheritableThreadLocals(false) .priority(1) .stackSize(10) .uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { } }); } // 创建平台线程建造器,对应于Thread实例 public static Thread.Builder.OfPlatform ofPlatform() { return null; } // 创建虚拟线程建造器,对应于VirtualThread public static Thread.Builder.OfVirtual ofVirtual() { return null; } }
参考资料:
Working with Virtual Threads in Spring 6 | Baeldung
JDK新API结构并并发工具类相关文档:
结构性并发: https://openjdk.org/jeps/453
范围值(类似ThreadLocal): https://openjdk.org/jeps/446
标签:线程,Java,Thread,java,Spring,static,new,var,协程 From: https://www.cnblogs.com/sunyl/p/17495134.html