鸿蒙OS 分布式任务调度概述
在 HarmonyO S中,分布式任务调度平台对搭载 HarmonyOS 的多设备构筑的“超级虚拟终端”提供统一的组件管理能力,为应用定义统一的能力基线、接口形式、数据结构、服务描述语言,屏蔽硬件差异;支持远程启动、远程调用、业务无缝迁移等分布式任务。 | |
---|---|
分布式任务调度平台在底层实现 Ability(分布式任务调度的基本组件)跨设备的启动/关闭、连接及断开连接以及迁移等能力,实现跨设备的组件管理:
启动和关闭:向开发者提供管理远程 Ability 的能力,即支持启动 Page 模板的 Ability,以及启动、关闭 Service 和 Data 模板的 Ability。
连接和断开连接:向开发者提供跨设备控制服务( Service 和 Data 模板的 Ability )的能力,开发者可以通过与远程服务连接及断开连接实现获取或注销跨设备管理服务的对象,达到和本地一致的服务调度。
迁移能力:向开发者提供跨设备业务的无缝迁移能力,开发者可以通过调用 Page 模板 Ability 的迁移接口,将本地业务无缝迁移到指定设备中,打通设备间壁垒。
鸿蒙OS 发布式任务调度开发指导
场景介绍
开发者在应用中集成分布式调度能力,通过调用指定能力的分布式接口,实现跨设备能力调度。根据 Ability 模板及意图的不同,分布式任务调度向开发者提供以下六种能力:启动远程 FA、启动远程 PA、关闭远程 PA、连接远程 PA、断开连接远程 PA 和 FA 跨设备迁移。 | |
---|---|
连接远程PA
connectAbility(Intent intent, IAbilityConnection conn)接口提供连接指定设备上 PA 的能力,Intent 中指定待连接 PA 的设备 deviceId、bundleName 和 abilityName。当连接成功后,通过在 conn 定义的 onAbilityConnectDone 回调中获取对端 PA 的服务代理,两者的连接关系则由 conn 维护。具体的参数定义如下表所示:
启动远程FA/PA
startAbility(Intent intent) 接口提供启动指定设备上 FA 和 PA 的能力,Intent 中指定待启动 FA/PA 的设备 deviceId、bundleName 和 abilityName。具体参数定义如下表所示:
断开远程 PA 连接:disconnectAbility (IAbilityConnection conn)。
关闭远程 PA:boolean stopAbility (Intent intent)。
迁移FA
continueAbility(String deviceId)接口提供将本地FA迁移到指定设备上的能力,需要开发者在调用时指定目标设备的 deviceId。具体参数定义如下表所示:
开发步骤
导入功能依赖的包。
// 以下依赖包含分布式调度平台开放的接口,用于:连接/断开连接远程 PA、启动远程 FA、通过连接关系注册的回调函数 onAbilityConnectDon e中返回的对端 PA 的代理,实现对PA的控制
import ohos.aafwk.ability.AbilitySlice;
import ohos.aafwk.ability.IAbilityConnection;
import ohos.aafwk.content.Intent;
import ohos.aafwk.content.Operation;
import ohos.bundle.ElementName;
// 为了实现迁移能力,需要引入传递迁移所需数据的包以及实现迁移能力的接口。
import ohos.aafwk.ability.IAbilityContinuation;
import ohos.aafwk.content.IntentParams;
// 为了实现跨设备指令及数据通信,需要集成 HarmonyOS 提供的 RPC 接口
import ohos.rpc.IRemoteObject;
import ohos.rpc.IRemoteBroker;
import ohos.rpc.MessageParcel;
import ohos.rpc.MessageOption;
import ohos.rpc.RemoteException;
import ohos.rpc.RemoteObject;
//(可选)多设备场景下涉及设备选择,为此需要引入组网设备发现的能力
import ohos.distributedschedule.interwork.DeviceInfo;
import ohos.distributedschedule.interwork.DeviceManager;
// (可选)设计界面相关的包函数,对 FA 界面及按钮进行绘制
import ohos.agp.components.Button;
import ohos.agp.components.Component;
import ohos.agp.components.Component.ClickedListener;
import ohos.agp.components.ComponentContainer.LayoutConfig;
import ohos.agp.components.element.ShapeElement;
import ohos.agp.components.PositionLayout;
(可选)编写一个基本的 FA 用于使用分布式能力。
// 调用 AbilitySlice 模板实现一个用于控制基础功能的 FA
// Ability 和 AbilitySlice 类均需要实现 IAbilityContinuation 及其方法,才可以实现 FA 迁移。AbilitySlice 的代码示例如下
public class SampleSlice extends AbilitySlice implements IAbilityContinuation {
@Override
public void onStart(Intent intent) {
super.onStart(intent);
// 开发者可以自行进行界面设计
// 为按钮设置统一的背景色
// 例如通过PositionLayout指定大小可以实现简单界面
PositionLayout layout = new PositionLayout(this);
LayoutConfig config = new LayoutConfig(LayoutConfig.MATCH_PARENT, LayoutConfig.MATCH_PARENT);
layout.setLayoutConfig(config);
ShapeElement buttonBg = new ShapeElement();
buttonBg.setRgbColor(new RgbColor(0,125,255));
addComponents(layout, buttonBg, config);
super.setUIContent(layout);
}
@Override
public void onInactive() {
super.onInactive();
}
@Override
public void onActive() {
super.onActive();
}
@Override
public void onBackground() {
super.onBackground();
}
@Override
public void onForeground(Intent intent) {
super.onForeground(intent);
}
@Override
public void onStop() {
super.onStop();
}
}
此步骤展示了一个简单 FA 的实现过程,实际开发中请开发者根据需要进行设计。
(可选)为不同的能力设置相应的控制按钮。
// 建议开发者按照自己的界面进行按钮设计
// 开发者可以自行实现如 createButton 的方法,新建一个显示文字 text,背景色为 buttonBg 以及大小尺寸位置符合 config 设置的按钮,用来与用户发生交互
// private Button createButton(String text, ShapeElement buttonBg, LayoutConfig config)
// 按照顺序在 PositionLayout 中依次添加按钮的示例
private void addComponents(PositionLayout linear, ShapeElement buttonBg, LayoutConfig config) {
// 构建远程启动FA的按钮
btnStartRemoteFA = createButton("StartRemoteFA", buttonBg, config);
btnStartRemoteFA.setClickedListener(mStartRemoteFAListener);
linear.addComponent(btnStartRemoteFA);
// 构建远程启动PA的按钮
btnStartRemotePA = createButton("StartRemotePA", buttonBg, config);
btnStartRemotePA.setClickedListener(mStartRemotePAListener);
linear.addComponent(btnStartRemotePA);
// 构建远程关闭PA的按钮
btnStopRemotePA = createButton("StopRemotePA", buttonBg, config);
btnStopRemotePA.setClickedListener(mStopRemotePAListener);
linear.addComponent(btnStopRemotePA);
// 构建连接远程PA的按钮
btnConnectRemotePA = createButton("ConnectRemotePA", buttonBg, config);
btnConnectRemotePA.setClickedListener(mConnectRemotePAListener);
linear.addComponent(btnConnectRemotePA);
// 构建控制连接PA的按钮
btnControlRemotePA = createButton("ControlRemotePA", buttonBg, config);
btnControlRemotePA.setClickedListener(mControlPAListener);
linear.addComponent(btnControlRemotePA);
// 构建与远程PA断开连接的按钮
btnDisconnectRemotePA = createButton("DisconnectRemotePA", buttonBg, config);
btnDisconnectRemotePA.setClickedListener(mDisconnectRemotePAListener);
linear.addComponent(btnDisconnectRemotePA);
// 构建迁移FA的按钮
btnContinueRemoteFA = createButton("ContinueRemoteFA", buttonBg, config);
btnContinueRemoteFA.setClickedListener(mContinueAbilityListener);
linear.addComponent(btnContinueRemoteFA);
}
此处只展示了基于按钮控制的能力调度方法,实际开发中请开发者根据需要选择能力调度方式。
通过设备管理 DeviceManager 提供的 getDeviceList 接口获取设备列表,用于指定目标设备。
// ISelectResult 是一个自定义接口,用来处理指定设备 deviceId 后执行的行为
interface ISelectResult {
void onSelectResult(String deviceId);
}
// 获得设备列表,开发者可在得到的在线设备列表中选择目标设备执行操作
private void scheduleRemoteAbility(ISelectResult listener) {
// 调用DeviceManager的getDeviceList接口,通过FLAG_GET_ONLINE_DEVICE标记获得在线设备列表
List<DeviceInfo> onlineDevices = DeviceManager.getDeviceList(DeviceInfo.FLAG_GET_ONLINE_DEVICE);
// 判断组网设备是否为空
if (onlineDevices.isEmpty()) {
listener.onSelectResult(null);
return;
}
int numDevices = onlineDevices.size();
ArrayList<String> deviceIds = new ArrayList<>(numDevices);
ArrayList<String> deviceNames = new ArrayList<>(numDevices);
onlineDevices.forEach((device) -> {
deviceIds.add(device.getDeviceId());
deviceNames.add(device.getDeviceName());
});
// 以选择首个设备作为目标设备为例
// 开发者也可按照具体场景,通过别的方式进行设备选择
String selectDeviceId = deviceIds.get(0);
listener.onSelectResult(selectDeviceId);
}
上述实例中涉及对在线组网设备的查询,该项能力需要开发者在对应的 config.json 中声明获取设备列表及设备信息的权限,如下所示:
{
"reqPermissions": [
{
"name": "ohos.permission.DISTRIBUTED_DEVICE_STATE_CHANGE"
},
{
"name": "ohos.permission.GET_DISTRIBUTED_DEVICE_INFO"
},
{
"name": "ohos.permission.GET_BUNDLE_INFO"
}
]
}
为启动远程 FA 的按钮设置点击回调,实现启动远程 FA 的能力。
// 启动一个指定 bundleName 和 abilityName 的 FA
private ClickedListener mStartRemoteFAListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
// 启动远程PA
scheduleRemoteAbility(new ISelectResult() {
@Override
void onSelectResult(String deviceId) {
if (deviceId != null) {
Intent intent = new Intent();
// 通过scheduleRemoteAbility指定目标设备deviceId
// 指定待启动FA的bundleName和abilityName
// 例如:bundleName = "com.huawei.helloworld"
// abilityName = "com.huawei.helloworld.SampleFeatureAbility"
// 设置分布式标记,表明当前涉及分布式能力
Operation operation = new Intent.OperationBuilder()
.withDeviceId(deviceId)
.withBundleName(bundleName)
.withAbilityName(abilityName)
.withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)
.build();
intent.setOperation(operation);
// 通过AbilitySlice包含的startAbility接口实现跨设备启动FA
startAbility(intent);
}
}
});
}
};
为启动和关闭 PA 定义回调,实现启动和关闭 PA 的能力。
对于 PA 的启动、关闭、连接等操作都需要开发者提供目标设备的 deviceId
。开发者可以通过 DeviceManager 相关接口得到当前组网下的设备列表,并以弹窗的形式供用户选择,也可以按照实际需要实现其他个性化的处理方式。在点击事件回调函数中,需要开发者指定得到 deviceId 后的处理逻辑,即实现类似上例中 listener.onSelectResult(String deviceId) 的方法,代码示例如下:
// 启动远程 PA
private ClickedListener mStartRemotePAListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
// 启动远程PA
scheduleRemoteAbility(new ISelectResult() {
@Override
void onSelectResult(String deviceId) {
if (deviceId != null) {
Intent intentToStartPA = new Intent();
// bundleName和abilityName与待启动PA对应
// 例如:bundleName = "com.huawei.helloworld"
// abilityName = "com.huawei.helloworld.SampleParticleAbility"
Operation operation = new Intent.OperationBuilder()
.withDeviceId(deviceId)
.withBundleName(bundleName)
.withAbilityName(abilityName)
.withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)
.build();
intentToStartPA.setOperation(operation);
startAbility(intentToStartPA);
}
}
});
}
};
// 关闭远程 PA,和启动类似开发者需要指定待关闭 PA 对应的 bundleName 和 abilityName
private ClickedListener mStopRemotePAListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
scheduleRemoteAbility(new ISelectResult() {
@Override
void onSelectResult(String deviceId) {
if (deviceId != null) {
Intent intentToStopPA = new Intent();
// bundleName和abilityName与待关闭PA对应
// 例如:bundleName = "com.huawei.helloworld"
// abilityName = "com.huawei.helloworld.SampleParticleAbility"
Operation operation = new Intent.OperationBuilder()
.withDeviceId(deviceId)
.withBundleName(bundleName)
.withAbilityName(abilityName)
.withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)
.build();
intentToStopPA.setOperation(operation);
stopAbility(intentToStopPA);
}
}
});
}
};
启动和关闭的行为类似,开发者只需在 Intent 中指定待调度 PA 的 deviceId、bundleName 和 abilityName,并以 operation 的形式封装到 Intent 内。通过 AbilitySlice(Ability)包含的 startAbility()和 stopAbility()接口即可实现相应功能。
设备 A 连接设备 B 侧的 PA,利用连接关系调用该 PA 执行特定任务,以及断开连接。
// 当连接完成时,用来提供管理已连接 PA 的能力
private MyRemoteProxy mProxy = null;
// 用于管理连接关系
private IAbilityConnection conn = new IAbilityConnection() {
@Override
public void onAbilityConnectDone(ElementName element, IRemoteObject remote, int resultCode) {
// 跨设备PA连接完成后,会返回一个序列化的IRemoteObject对象
// 通过该对象得到控制远端服务的代理
mProxy = new MyRemoteProxy(remote);
btnConnectRemotePA.setText("connectRemoteAbility done");
}
@Override
public void onAbilityDisconnectDone(ElementName element, int resultCode) {
// 当已连接的远端PA关闭时,会触发该回调
// 支持开发者按照返回的错误信息进行PA生命周期管理
disconnectAbility(conn);
}
};
仅通过启动/关闭两种方式对 PA 进行调度无法应对需长期交互的场景,因此,分布式任务调度平台向开发者提供了跨设备PA连接及断开连接的能力。为了对已连接 PA 进行管理,开发者需要实现一个满足 IAbilityConnection 接口的连接状态检测实例,通过该实例可以对连接及断开连接完成时设置具体的处理逻辑,例如:获取控制对端 PA 的代理等。进一步为了使用该代理跨设备调度 PA,开发者需要在本地及对端分别实现对外接口一致的代理。一个具备加法能力的代理示例如下:
// 以连接提供加法计算能力的 PA 为例。为了提供跨设备连接能力,需要在本地发起连接侧和对端被连接侧分别实现代理。
// 发起连接侧的代理示例如下
public class MyRemoteProxy implements IRemoteBroker{
private static final int ERR_OK = 0;
private static final int COMMAND_PLUS = IRemoteObject.MIN_TRANSACTION_ID;
private final IRemoteObject remote;
public MyRemoteProxy(
/* [in] */ IRemoteObject remote) {
this.remote = remote;
}
@Override
public IRemoteObject asObject() {
return remote;
}
public int plus(
/* [in] */ int a,
/* [in] */ int b) throws RemoteException {
MessageParcel data = MessageParcel.obtain();
MessageParcel reply = MessageParcel.obtain();
// option不同的取值,决定采用同步或异步方式跨设备控制PA
// 本例需要同步获取对端PA执行加法的结果,因此采用同步的方式,即MessageOption.TF_SYNC
// 具体MessageOption的设置,可参考相关API文档
MessageOption option = new MessageOption(MessageOption.TF_SYNC);
data.writeInt(a);
data.writeInt(b);
try {
remote.sendRequest(COMMAND_PLUS, data, reply, option);
int ec = reply.readInt();
if (ec != ERR_OK) {
throw new RemoteException();
}
int result = reply.readInt();
return result;
} catch (RemoteException e) {
throw new RemoteException();
} finally {
data.reclaim();
reply.reclaim();
}
}
}
此外,对端待连接的 PA 需要实现对应的客户端,代码示例如下所示:
// 以计算加法为例,对端实现的客户端如下
public class MyRemote extends RemoteObject implements IRemoteBroker{
private static final int ERR_OK = 0;
private static final int ERROR = -1;
private static final int COMMAND_PLUS = IRemoteObject.MIN_TRANSACTION_ID;
public MyRemote() {
super("MyService_Remote");
}
@Override
public IRemoteObject asObject() {
return this;
}
@Override
public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) {
if (code != COMMAND_PLUS) {
reply.writeInt(ERROR);
return false;
}
int value1 = data.readInt();
int value2 = data.readInt();
int sum = value1 + value2;
reply.writeInt(ERR_OK);
reply.writeInt(sum);
return true;
}
}
对端除了要实现如上所述的客户端外,待连接的 PA 还需要作如下修改:
// 为了返回给连接方可调用的代理,需要在该 PA 中实例化客户端,例如作为该 PA 的成员变量
private MyProxy remote = new MyProxy();
// 当该 PA 接收到连接请求时,即将该客户端转化为代理返回给连接发起侧
@Override
protected IRemoteObject onConnect(Intent intent) {
super.onConnect(intent);
return remote.asObject();
}
完成上述步骤后,可以通过点击事件实现连接、利用连接关系控制 PA 以及断开连接等行为,代码示例如下:
// 连接远程PA
private ClickedListener mConnectRemotePAListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
scheduleRemoteAbility(new ISelectResult() {
@Override
void onSelectResult(String deviceId) {
if (deviceId != null) {
Intent connectPAIntent = new Intent();
// bundleName和abilityName与待连接的PA一一对应
// 例如:bundleName = "com.huawei.helloworld"
// abilityName = "com.huawei.helloworld.SampleParticleAbility"
Operation operation = new Intent.OperationBuilder()
.withDeviceId(deviceId)
.withBundleName(bundleName)
.withAbilityName(abilityName)
.withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)
.build();
connectPAIntent.setOperation(operation);
connectAbility(connectPAIntent, conn);
}
}
});
}
};
// 控制已连接PA执行加法
private ClickedListener mControlPAListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
if (mProxy != null) {
int ret = -1;
try {
ret = mProxy.plus(10, 20);
} catch (RemoteException e) {
e.printStackTrace();
}
btnControlRemotePA.setText("ControlRemotePA result = " + ret);
}
}
};
// 与远程PA断开连接
private ClickedListener mDisconnectRemotePAListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
// 按钮复位
btnConnectRemotePA.setText("ConnectRemotePA");
btnControlRemotePA.setText("ControlRemotePA");
disconnectAbility(conn);
}
};
通过连接/断开连接远程 PA,与跨设备 PA 建立长期的管理关系。例如在本例中,通过连接关系得到远程 PA 的控制代理后,实现跨设备计算加法并将结果返回到本地显示。在实际开发中,开发者可以根据需要实现多种分布式场景,例如:跨设备位置/电量等信息的采集、跨设备计算资源互助等。
设备 A 将运行时的 FA 迁移到设备 B,实现业务在设备间无缝迁移
// 跨设备迁移FA
// 本地FA设置当前运行任务
private ClickedListener mContinueAbilityListener = new ClickedListener() {
@Override
public void onClick(Component arg0) {
// 用户选择设备后实现业务迁移
scheduleRemoteAbility(new ISelectResult() {
@Override
public void onSelectResult(String deviceId) {
continueAbility(deviceId);
}
});
}
};
FA 的迁移还涉及到状态数据的传递。为此,继承的 IAbilityContinuation 接口为开发者提供迁移过程中特定事件的管理能力。通过自定义迁移事件相关的行为,最终实现对 Ability 的迁移。具体的定义可以参考相关的 API 文档,此处主要以较为常用的两个事件,包括迁移发起端完成迁移的回调 onCompleteContinuation(int result)以及接收到远端迁移行为传递数据的回调 onRestoreData(IntentParams restoreData)。其他还包括迁移到远端设备的 FA 关闭的回调 onRemoteTerminated()、用于本地迁移发起时保存状态数据的回调 onSaveData(IntentParams saveData)和本地发起迁移的回调 onStartContinuation()。按照实际应用自定义特定场景对应的回调,可以完成多种场景下 FA 的迁移任务。
@Override
public boolean onSaveData(IntentParams saveData) {
String exampleData = String.valueOf(System.currentTimeMillis());
saveData.setParam("continueParam", exampleData);
return true;
}
@Override
public boolean onRestoreData(IntentParams restoreData) {
// 远端FA迁移传来的状态数据,开发者可以按照特定的场景对这些数据进行处理
Object data = restoreData.getParam("continueParam");
return true;
}
@Override
public void onCompleteContinuation(int result) {
btnContinueRemoteFA.setText("ContinueAbility Done");
}
FA 迁移可以打通设备间的壁垒,有助于不同能力的设备进行互助。前文以一个简单的例子介绍如何通过分布式任务调度提供的能力,实现 FA 跨设备的迁移(包括 FA 启动及状态数据的同步)。
考。