1. STK容器化
目录结构
- 包含Python解释器、STK API库、并行计算库与stk相关的安装程序
Dockerfile文件
# 包含Python基础环境
FROM centos:7 as stk-engine
# 用户
USER root
# 拷贝需要的资源文件
# 包含内容:2个引擎安装程序压缩文件、1个破解程序目录、rar解压程序
# 通过ADD命令拷贝tgz文件并自动解压
ADD --chown=root assets/Crack_12.4.0_linux.tgz /apps/
ADD --chown=root assets/Miniconda3-py38_4.12.0-Linux-x86_64.sh /apps/
ADD --chown=root assets/stk_binaries_v12.4.0.tgz /apps/
ADD --chown=root assets/stk_data_v12.4.0.tgz /apps/
# 切换工作目录
WORKDIR /apps
# 破解 -- 授予权限
RUN chmod u+x Crack_12.4.0/bin/*.so && \
chmod u+x Crack_12.4.0/licensingclient/linx64/* && \
chmod u+x Crack_12.4.0/shared_files/licensing/ansyslmd.ini && \
chmod u+x Crack_12.4.0/shared_files/licensing/license_files/ansyslmd.lic && \
chmod u+x Crack_12.4.0/shared_files/licensing/prodord/ansysli.prodord.xml
# 破解 -- 替换文件(一些可执行文件与证书文件)
RUN mv -f Crack_12.4.0/bin/* stk12.4.0/bin/ && \
mv -f Crack_12.4.0/licensingclient/linx64/* stk12.4.0/licensingclient/linx64/ && \
mv -f Crack_12.4.0/shared_files/licensing/ansyslmd.ini stk12.4.0/shared_files/licensing/ && \
mv -f Crack_12.4.0/shared_files/licensing/license_files stk12.4.0/shared_files/licensing/ && \
mv -f Crack_12.4.0/shared_files/licensing/prodord/ansysli.prodord.xml stk12.4.0/shared_files/licensing/prodord/
# 破解 -- 删除
RUN rm -rf Crack_12.4.0
# 环境变量配置
ENV STK_USER_HOME=/apps/stk12.4.0
WORKDIR "${STK_USER_HOME}"
ENV LD_LIBRARY_PATH="${STK_USER_HOME}/bin" \
PATH="${STK_USER_HOME}/bin:${PATH}" \
STK_CONFIG_DIR="${STK_USER_HOME}/config" \
STK_INSTALL_DIR="${STK_USER_HOME}"
# 添加新用户
RUN ./bin/stkxnewuser --allowOnline=no
# 安装Conda与3.8.13版本的Python
WORKDIR /apps
RUN bash Miniconda3-py38_4.12.0-Linux-x86_64.sh -b -p /apps/miniconda3 && rm -f Miniconda3-py38_4.12.0-Linux-x86_64.sh
ENV PATH="${PATH}:/apps/miniconda3/bin"
RUN /apps/miniconda3/bin/conda init bash
# 安装STK驱动库
RUN /apps/miniconda3/bin/python -m pip install "${STK_USER_HOME}/bin/AgPythonAPI/agi.stk12-12.4.0-py3-none-any.whl"
FROM centos:7 as stk-coordinator
USER root
# 添加并行计算coordinator
ADD --chown=root assets/STK_Parallel_Computing_Coordinator_v2.5.tgz /apps/stk/
WORKDIR /apps/
# 安装 .NET 核心依赖
RUN set -e; \
yum -y install krb5-libs; \
yum -y install libicu; \
yum -y install openssl-libs; \
yum -y install zlib; \
yum clean all; \
rm -rf /var/cache/yum
# 定义用户目录
ENV STK_USER_HOME=/apps/stk
# 将可执行文件添加到PATH
ENV PATH="${STK_USER_HOME}"/Coordinator/bin:"${PATH}"
# 监听端口
EXPOSE 9090
# 启动 coordinator
CMD AGI.Parallel.CoordinatorService --nostdin
FROM stk-engine as stk-agent
USER root
ADD --chown=root assets/STK_Parallel_Computing_Agent_v2.5.tgz "${STK_USER_HOME}"/
ADD --chown=root assets/agiparallel-2.5-py3-none-any.whl /apps/
# 安装 .NET 核心依赖
RUN set -e; \
yum -y install krb5-libs; \
yum -y install libicu; \
yum -y install openssl-libs; \
yum -y install zlib; \
yum clean all; \
rm -rf /var/cache/yum
# 配置环境变量
ENV PATH="${STK_USER_HOME}"/Agent/bin:"${PATH}" \
NUMBER_OF_WORKER_PROCESSES=$(nproc) \
COORDINATOR="localhost" \
PORT="9090"
# 安装并行计算库
RUN /apps/miniconda3/bin/python -m pip install "agiparallel-2.5-py3-none-any.whl" && rm -f "agiparallel-2.5-py3-none-any.whl"
# 启动命令
CMD AGI.Parallel.AgentService --nostdin --coordinator="${COORDINATOR}" --port="${PORT}"
镜像构建与推送(多进程方式)
set -e
# {}& 构建后台进程,wait等待前面构建的进程都执行完成
# 构建镜像
{
# 构建stk-engine
docker build --network host --target stk-engine -t harbor.tiduyun.com/r_learning/stk-engine:v12.4.0 .
# 构建 agent
docker build --network host --target stk-agent -t harbor.tiduyun.com/r_learning/stk-agent:v2.5 .
}&
{
# 构建coordinator
docker build --network host --target stk-coordinator -t harbor.tiduyun.com/r_learning/stk-coordinator:v2.5 .
}&
wait
# 推送镜像
{
docker push harbor.tiduyun.com/r_learning/stk-engine:v12.4.0
}&
{
docker push harbor.tiduyun.com/r_learning/stk-agent:v2.5
}&
{
docker push harbor.tiduyun.com/r_learning/stk-coordinator:v2.5
}&
wait
2. 测试
# 验证stk-egine是否安装成功,能正常显示版本则破解成功
# 运行并进入容器
docker run -it harbor.tiduyun.com/r_learning/stk-engine:v12.4.0 bash
# 执行命令
python -c "from agi.stk12.stkengine import STKEngine;stk = STKEngine.StartApplication(noGraphics=True);print(stk.Version)"
# 正确输出 stk的版本信息
STK Engine v12.4.0
## 测试并行化计算框架。并发执行任务
# 启动coordinator
docker run -d --name coordinator --net host stk-coordinator:v2.5
# 启动agent
docker run -d --name agent --net host stk-agent:v2.5
#
# 启动容器将示例代码文件挂载到容器并进入容器
docker run -it --rm --net host -v /<path>/to/client_example.py:/tmp/client_example.py -w /tmp stk-agent:v2.5 bash
# docker run -it --rm --net host -v /home/wjy/Desktop/code/STKCodeExamples/StkEngineContainerization/linux/stk-parallel-computing-server/client_example.py:/tmp/client_example.py -w /tmp stk-agent:v2.5 bash
# 运行代码
python client_example.py
# 正确输出
Access intervals for analysis interval ('2022-10-27T03:15:46.645206+00:00', '2022-10-28T03:15:46.645206+00:00'):
null
Access intervals for analysis interval ('2022-01-01T00:00:00.000+00:00', '2022-01-02T00:00:00.000+00:00'):
{
"accessIntervals": [
{
"start": "2022-01-01T01:43:55.165+00:00",
"stop": "2022-01-01T01:47:24.146+00:00"
},
{
"start": "2022-01-01T03:17:24.676+00:00",
"stop": "2022-01-01T03:24:04.233+00:00"
},
{
"start": "2022-01-01T04:52:28.454+00:00",
"stop": "2022-01-01T04:59:20.829+00:00"
},
{
"start": "2022-01-01T06:28:37.315+00:00",
"stop": "2022-01-01T06:33:25.412+00:00"
}
]
}
3. STK并行计算示例
架构图
基本流程
- 创建Task类 > 连接Job调度器 > 创建Job > 提交Task > 提交Job > 等待Job完成并获取Task执行结果
- Example
from agiparallel.client import ClusterJobScheduler
import random
# 1. 创建Task类
class PiDartboardAlgorithmTask():
def __init__(self, darts_per_task):
self.darts_per_task = darts_per_task
# 定义Task执行入口函数
def execute(self):
num_darts_in_circle = 0
for i in range(self.darts_per_task):
x = (random.random() - 0.5) * 2
y = (random.random() - 0.5) * 2
if (x * x + y * y) <= 1.0:
num_darts_in_circle += 1
self.result = num_darts_in_circle
def dartboard_example(num_tasks, darts_per_task):
# 2. 连接调度器
with ClusterJobScheduler("localhost") as scheduler:
scheduler.connect()
# 3. 创建Job
job = scheduler.create_job()
job.name = "PiJob"
job.description = "Computes digits of Pi using dartboard algorithm"
# 4. 提交Task
for i in range(num_tasks):
job.add_task(PiDartboardAlgorithmTask(darts_per_task))
# 5. 提交Job
job.submit()
# 6. 等待Job完成
job.wait_until_done()
# 7. 获取Job中Task的执行结果
sum_of_darts = 0
for i in range(num_tasks):
sum_of_darts += job.tasks[i].result
pi_approximation = 4 * sum_of_darts / (darts_per_task * num_tasks)
print("PI is approximately {0:1.10f}".format(pi_approximation))
if __name__ == "__main__":
dartboard_example(8, 1000000)
Example -- 关注coordinator的两个环境变量:IP地址、端口号
from agiparallel.client import *
from agiparallel.constants import TaskProperties
from agiparallel.infrastructure.TaskEnvironment import TaskEnvironment
from agi.stk12.stkengine import STKEngine
from agi.stk12.stkobjects.stkobjects import AgESTKObjectType, IAgSatellite, IAgVePropagatorTwoBody
from datetime import datetime, timedelta, timezone
import json, os, uuid
def main():
# 定义时间与时间间隔
today = datetime.now(timezone.utc)
tomorrow = today + timedelta(days=1)
timeIntervals = [
(today.isoformat(), tomorrow.isoformat()),
('2022-01-01T00:00:00.000+00:00', '2022-01-02T00:00:00.000+00:00')
]
# Note
# IP地址
coordinatorHostname = os.getenv('COORDINATOR_HOSTNAME', default='localhost')
# 端口号
coordinatorPort = os.getenv('COORDINATOR_PORT', default='9090')
# 连接 coordinator -> 创建 job -> 创建 task -> 提交job -> 等待job完成 -> 获取task输出结果
with ClusterJobScheduler(coordinatorHostname, int(coordinatorPort)) as client:
client.connect()
# 创建job
job = client.create_job()
# 设置job的执行STK环境
job.set_task_environment(StkTaskEnvironment())
# 创建task
for interval in timeIntervals:
job.add_task(ComputeTask(*interval))
# 提交任务
job.submit()
# 等待任务执行完毕
job.wait_until_done()
# 遍历task中的result属性获取task执行结果
for i in range(len(timeIntervals)):
print()
print(f'Access intervals for analysis interval {timeIntervals[i]}:')
print(json.dumps(job.tasks[i].result, indent=4))
class StkTaskEnvironment(TaskEnvironment):
"""Task执行的STK环境"""
def __init__(self):
self.unique_id = (uuid.UUID("6DDE57D8-49F3-4343-9E2F-4047247C8B41"))
def setup(self):
# 启动STK引擎
self.app = STKEngine.StartApplication(noGraphics=True)
# 创建STK Object Model 的 root 对象
self.root = self.app.NewObjectRoot()
# 设置时间格式
self.root.UnitPreferences.SetCurrentUnit('DateFormat','ISO-LTZ')
# 创建场景
self.root.NewScenario('Example')
# 获取当前场景
self.scenario = self.root.CurrentScenario
# 添加设施
self.mySatelliteObject = self.scenario.Children.New(AgESTKObjectType.eSatellite, 'MySatellite')
self.myPlaceObject = self.scenario.Children.New(AgESTKObjectType.ePlace, 'MyPlace')
self.mySatellite = IAgSatellite(self.mySatelliteObject)
self.mySatellitePropagator = IAgVePropagatorTwoBody(self.mySatellite.Propagator)
def teardown(self):
"""job执行完执行资源释放:关闭场景与STK引擎"""
if self.root.CurrentScenario is not None:
self.root.CloseScenario()
self.app.ShutDown()
class ComputeTask:
"""执行的Task定义"""
def __init__(self, startTime, stopTime):
self.startTime = startTime
self.stopTime = stopTime
def execute(self):
# 获取日志
log = self.get_property(TaskProperties.LOGGER)
# 获取STK环境
env = self.get_property(TaskProperties.ENVIRONMENT)
if not env:
raise Exception('could not get task environment!')
# 使用STK环境做计算
env.scenario.AnalysisInterval.SetStartAndStopTimes(self.startTime, self.stopTime)
env.mySatellitePropagator.InitialState.OrbitEpoch.SetExplicitTime(self.startTime)
env.mySatellitePropagator.Propagate()
# 计算覆盖率
access = env.mySatelliteObject.GetAccessToObject(env.myPlaceObject)
access.ComputeAccess()
# 计算访问间隔时间
accessIntervals = access.ComputedAccessIntervalTimes.ToArray(0, -1)
intvllist = [{ 'start': interval[0], 'stop': interval[1] } for interval in accessIntervals]
# 保存计算的结果
self.result = { 'accessIntervals': intvllist }
if __name__ == "__main__":
main()
4. 错误问题
OSError: libagutil.so: cannot open shared object file: No such file or directory
# 在主机上执行示例时,报出系统错误
# 错误原因:在导入agi库时,内部有一个非nt系统加载libagutil.so库的逻辑,Ubuntu系统会存在这个问题
# 解决方法:将代码挂载到 stk-agent 容器内运行
5. 注意事项
- IP与端口
- Coordinator运行时,需知晓其运行的节点IP与监听的端口,等待Agent注册或客户端提交Job
- Agent启动时,需要Coordinator所在的节点IP与端口,注册自身
- 客户端提交Job时,需要指定Coordinator运行的节点IP与端口
- Python的版本
- 要求提交任务的客户端版本与执行任务的agent中的Python版本一致(Python3.8.x与Python3.9.x是两个不同的版本),不一致序列化会有报错,因为task任务的提交给任务调度器是依赖Python中的 pickle 库进行序列化
- 依赖库问题
- 若执行的并行任务代码依赖的第三方库是非官方库,则需要在agent端安装,否则并行计算任务无法执行
-
产品
- 类型Ray集群,一个STK集群包含一个调度器多个work形式,配合在线编译器实现STK相关对象仿真(航天领域)与计算能力 (陆海空天场景下进行任务分析、规划、设计、操作以及事后分析)
- 类型Ray集群,一个STK集群包含一个调度器多个work形式,配合在线编译器实现STK相关对象仿真(航天领域)与计算能力 (陆海空天场景下进行任务分析、规划、设计、操作以及事后分析)