HarmonyOS Next 并发 taskpool 和 worker
总览
介绍
并发,指的是同一时间内,多段代码同时执行。在 ArkTs 编程中,并发分为异步并发和多线程并发。
异步并发
异步并发并不是真正的并发,比如在单核设备中,同时执行多端代码其实是通过 CPU 快速调度来实现的。比如一个司机,它在同一时间只
能开一辆车。做不到同时开两辆车。如果举一个极端的例子。这位司机可以看起来像在开两辆车。
我们把开车分成两个步骤:
- 上车
- 开动
如果我们把司机上车的时间极限缩小,比如为 0.00001 秒中,那么这个司机就可以做到同时开两辆车
开小米 su7
- 上车(0.00001 秒)
- 开车
马上下车,跑到另外一辆车旁边,然后
开问界 M7
- 上车(0.00001 秒)
- 开车
可以看到,只要切换任务的速度够快,就能理解成同时在开两辆车。这个就是我们的忙碌的 CPU 要做的事情了。
我们可以将 setTimeout 和 Promise/Async 都看成是 异步并发的技术就行。
多线程并发
多线程并发的模型常见的分成基于内存共享的并发模型和基于消息通信的并发模型。
Actor 并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,
因此得到了广泛的支持和使用。
当前 ArkTS 提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现。 TaskPool 和 Worker。
TaskPool 其实是基于 Worker 做到的一层封装。
TaskPool 和 Worker 的实现特点对比
主要是使用方法上的区别
- TaskPool 可以直接传递参数、Worker 需要自行封装参数
- TaskPool 可以直接接收返回数据,Worker 通过 onmessage 接收
- TaskPool 任务池个数上限自动管理,Worker 最多 64 个线程,具体看内存
- TaskPool 任务执行时长 同步代码上限 3 分钟,异步代码无限。Worker 无限
- TaskPool 可以设置设置任务的优先级,Workder 不支持
- TaskPool 支持取消任务,Worker 不支持
TaskPool 和 Worker 的适用场景对比
TaskPool 和 Worker 均支持多线程并发能力。由于 TaskPool 的工作线程会绑定系统的调度优先级,并且支持负载均衡(自动扩缩容),而
Worker 需要开发者自行创建,存在创建耗时以及不支持设置调度优先级,故在性能方面使用 TaskPool 会优于 Worker,因此大多数场景推
荐使用 TaskPool。
TaskPool 偏向独立任务维度,该任务在线程中执行,无需关注线程的生命周期,超长任务(大于 3 分钟且非长时任务)会被系统自动回
收;而 Worker 偏向线程的维度,支持长时间占据线程执行,需要主动管理线程生命周期。
常见的一些开发场景及适用具体说明如下:
-
运行时间超过 3 分钟(不包含 Promise 和 async/await 异步调用的耗时,例如网络下载、文件读写等 I/O 任务的耗时)的任务。例如后台
进行 1 小时的预测算法训练等 CPU 密集型任务,需要使用 Worker。
-
有关联的一系列同步任务。例如在一些需要创建、使用句柄的场景中,句柄创建每次都是不同的,该句柄需永久保存,保证使用该句
柄进行操作,需要使用 Worker。
-
需要设置优先级的任务。例如图库直方图绘制场景,后台计算的直方图数据会用于前台界面的显示,影响用户体验,需要高优先级处理,需要使用 TaskPool。
-
需要频繁取消的任务。例如图库大图浏览场景,为提升体验,会同时缓存当前图片左右侧各 2 张图片,往一侧滑动跳到下一张图片
时,要取消另一侧的一个缓存任务,需要使用 TaskPool。
-
大量或者调度点较分散的任务。例如大型应用的多个模块包含多个耗时任务,不方便使用 Worker 去做负载管理,推荐采用
TaskPool。
线程间通信对象
线程间通信指的是并发多线程间存在的数据交换行为。由于 ArkTS 语言兼容 TS/JS,其运行时的实现与其它所有的 JS 引擎一样,都是基于
Actor 内存隔离的并发模型提供并发能力。
普通对象
普通对象跨线程时通过拷贝形式传递
ArrayBuffer 对象
ArrayBuffer 内部包含一块 Native 内存。其 JS 对象壳与普通对象一样,需要经过序列化与反序列化拷贝传递,但是 Native 内存有两种传输方
式:拷贝和转移。
SharedArrayBuffer 对象
SharedArrayBuffer 内部包含一块 Native 内存,支持跨并发实例间共享,但是访问及修改需要采用Atomics类,防止数据竞争
Transferable
Transferable 对象(也称为 NativeBinding 对象)指的是一个 JS 对象,绑定了一个 C++对象,且主体功能由 C++提供
可以实现共享和转移模式
Sendable 对象
在传统 JS 引擎上,对象的并发通信开销的优化方式只有一种,就是把实现下沉到 Native 侧,通过Transferable 对象的转移或共享方式降低并发通信开销。而开发者仍然还有大量对象并发通信的诉求,这个问题在业界的 JS 引擎实现上并没有得到解决。
ArkTS 提供了 Sendable 对象类型,在并发通信时支持通过引用传递来解决上述问题。
Sendable 对象为可共享的,其跨线程前后指向同一个 JS 对象,如果其包含了 JS 或者 Native 内容,均可以直接共享,如果底层是 Native 实现
的,则需要考虑线程安全性。
多线程并发场景
针对常见的业务场景,主要可以对应分为三种并发任务:耗时任务、长时任务、常驻任务。
耗时任务并发场景简介
耗时任务指的是需要长时间执行的任务,如果在主线程执行可能导致应用卡顿掉帧、响应慢等问题。典型的耗时任务有 CPU 密集型任务、I/O 密集型任务以及同步任务。
对应耗时任务,常见的业务场景分类如下所示:
常见业务场景 | 具体业务描述 | CPU 密集型 | I/O 密集型 | 同步任务 |
---|---|---|---|---|
图片/视频编解码 | 将图片或视频进行编解码再展示。 | √ | √ | × |
压缩/解压缩 | 对本地压缩包进行解压操作或者对本地文件进行压缩操作。 | √ | √ | × |
JSON 解析 | 对 JSON 字符串的序列化和反序列化操作。 | √ | × | × |
模型运算 | 对数据进行模型运算分析等。 | √ | × | × |
网络下载 | 密集网络请求下载资源、图片、文件等。 | × | √ | × |
数据库操作 | 将聊天记录、页面布局信息、音乐列表信息等保存到数据库,或者应用二次启动时,读取数据库展示相关信息。 | × | √ | × |
长时任务并发场景简介
在应用业务实现过程中,对于需要较长时间不定时运行的任务,称为长时任务。长时任务如果放在主线程中执行会阻塞主线程的 UI 业务,出现卡顿丢帧等影响用户体验的问题。因此通常需要将这个独立的长时任务放到单独的子线程中执行。
典型的长时任务场景如下所示:
常见业务场景 | 具体业务描述 |
---|---|
定期采集传感器数据 | 周期性采集一些传感器信息(例如位置信息、速度传感器等),应用运行阶段长时间不间断运行。 |
监听 Socket 端口信息 | 长时间监听 Socket 数据,不定时需要响应处理。 |
上述业务场景均为独立的长时任务,任务执行周期长,跟外部交互简单,分发到后台线程后,需要不定期响应,以获取结果。这些类型的
任务使用 TaskPool 可以简化开发工作量,避免管理复杂的生命周期,避免线程泛滥,开发者只需要将上述独立的长时任务放入 TaskPool
队列,再等待结果即可。
常驻任务并发场景
在应用业务实现过程中,对于一些长耗时(大于 3min)且并发量不大的常驻任务场景,使用 Worker 在后台线程中运行这些耗时逻辑,避免阻塞主线程而导致出现丢帧卡顿等影响用户体验性的问题 。
常驻任务是指相比于短时任务,时间更长的任务,可能跟主线程生命周期一致。相比于长时任务,常驻任务更倾向于跟线程绑定的任务,单次运行时间更长(比如超过 3 分钟)。
对应常驻任务,较为常见的业务场景如下:
常见业务场景 | 具体业务描述 |
---|---|
游戏中台场景 | 启动子线程作为游戏业务的主逻辑线程,UI 线程只负责渲染。 |
长耗时任务场景 | 后台长时间的模型预测任务、或者硬件测试等 |
TaskPool 使用
TaskPool 基本使用 1
TaskPool 最简单的用法如下
其中需要注意的是执行任务的函数必须使用 @Concurrent 来修饰
taskpool.execute(函数, 参数);
import { taskpool } from '@kit.ArkTS'
// 任务具体逻辑
@Concurrent
function func1(n: number) {
return 1 + n
}
@Entry
@Component
struct Index {
@State
num: number = 0
fn1 = async () => {
// 执行任务
const res = await taskpool.execute(func1, this.num)
this.num = Number(res)
}
build() {
Column() {
Button("TaskPool基本使用 " + this.num)
.onClick(this.fn1)
}
.width("100%")
.height("100%")
.justifyContent(FlexAlign.Center)
}
}
TaskPool 基本使用 2
TaskPool 的基本使用还可以指定任务的优先级 如:
taskpool.execute(task, 优先级)
import { taskpool } from '@kit.ArkTS'
// 任务具体逻辑
@Concurrent
function func1(n: number) {
console.log(`任务 ${n}`)
}
@Entry
@Component
struct Index {
@State
num: number = 0
fn1 = async () => {
// 同时创建和执行10个任务 指定第10个任务是最高优先级
for (let index = 1; index <= 10; index++) {
// 创建任务,传入要执行的函数和该函数的形参
const task = new taskpool.Task(func1, index)
if (index !== 10) {
// 执行任务,并且制定优先级 taskpool.Priority.LOW
taskpool.execute(task, taskpool.Priority.LOW)
} else {
taskpool.execute(task, taskpool.Priority.HIGH)
}
}
}
build() {
Column() {
Button("TaskPool基本使用 " + this.num)
.onClick(this.fn1)
}
.width("100%")
.height("100%")
.justifyContent(FlexAlign.Center)
}
}
TaskPool 基本使用 3
TaskPool 还可以支持传入一些任务组,类似 Promise.All, 可以等待全部的耗时任务结束。
import { taskpool } from '@kit.ArkTS'
// 任务具体逻辑
@Concurrent
function func1(n: number) {
const promise = new Promise<number>(resolve => {
resolve(n)
setTimeout(() => {
}, 1000 + n * 100)
})
return promise
}
@Entry
@Component
struct Index {
fn1 = async () => {
// 创建任务组
const taskGroup = new taskpool.TaskGroup()
for (let index = 1; index <= 10; index++) {
// 创建任务,传入要执行的函数和该函数的形参
const task = new taskpool.Task(func1, index)
// 添加到任务组中
taskGroup.addTask(task)
}
// 执行任务组
const res = await taskpool.execute(taskGroup)
console.log("res", JSON.stringify(res))
}
build() {
Column() {
Button("TaskPool基本使用 执行一组任务 ")
.onClick(this.fn1)
}
.width("100%")
.height("100%")
.justifyContent(FlexAlign.Center)
}
}
TaskPool 和主线程通信
TaskPool 可以直接通过返回结果,将数据传递回来。但是如果,是持续监听的任务,那么此时可以使用 sendData
和 onReceiveData
和主线程进行通信
import { taskpool } from '@kit.ArkTS'
// 任务具体逻辑
@Concurrent
function func1(n: number) {
// 假设这里是持续发送的数据
taskpool.Task.sendData((new Date).toLocaleString())
return 100
}
@Entry
@Component
struct Index {
fn1 = async () => {
const task = new taskpool.Task(func1, 100)
// 主线程监听到的数据
task.onReceiveData((result: string) => {
console.log("主线程监听到的数据", result)
})
// 直接得到func1的结果
const res = await taskpool.execute(task)
console.log("直接返回的结果", res)
}
build() {
Column() {
Button("TaskPool基本使用 和主线程通信 ")
.onClick(this.fn1)
}
.width("100%")
.height("100%")
.justifyContent(FlexAlign.Center)
}
}
TaskPool 使用限制
TaskPool 使用过程中,不能直接使用 TaskPool 所在同一文件中的变量、函数或者类,否则会报错误
在使用 @Concurrent 修饰的函数中,只能使用导入的变量和局部变量。<ArkTSCheck>
Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
解决方案是 导出使用
Worker 使用
Worker 主要作用是为应用程序提供一个多线程的运行环境,可满足应用程序在执行过程中与主线程分离,在后台线程中运行一个脚本进
行耗时操作,极大避免类似于计算密集型或高延迟的任务阻塞主线程的运行
workder 主要通过
- onmessage 监听数据
- postMessage 发送数据
1. 新建 workder 文件
在你的模块上鼠标右键新建 worker 模块即可
entry/src/main/ets/workers/Worker.ets
import {
ErrorEvent,
MessageEvents,
ThreadWorkerGlobalScope,
worker,
} from "@kit.ArkTS";
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
workerPort.onmessage = (e: MessageEvents) => {
// 接收数据
const data = e.data as Record<string, number>;
// 发送给主线程
console.log("子线程监听到了信息,并且发送信息给主线程");
setTimeout(() => {
workerPort.postMessage(data.a + data.b);
}, 1000);
};
workerPort.onmessageerror = (e: MessageEvents) => {};
workerPort.onerror = (e: ErrorEvent) => {};
2. 主线程使用 woker 进行通信
import { MessageEvents, worker } from '@kit.ArkTS';
// 1 声明workder
let workerStage1: worker.ThreadWorker | null
@Entry
@Component
struct Index {
build() {
Column() {
Button("创建")
.onClick(() => {
// 2. 创建worker
workerStage1 = new worker.ThreadWorker('entry/ets/workers/Worker.ets');
})
Button("监听数据")
.onClick(() => {
// 3 监听workder接收到的信息
workerStage1!.onmessage = (e: MessageEvents) => {
console.log("总和", e.data)
}
}
)
Button("发送数据")
.onClick(() => {
// 4 向 workder发送信息
workerStage1!.postMessage({ a: 10, b: 20 })
})
}
.width("100%")
.height("100%")
.justifyContent(FlexAlign.Center)
}
}