我正在开发一个项目,涉及 跟踪 (640, 640) 网格上的数千个点。 跟踪这些点时,我将它们的运动以向量线性数组的格式存储,解释每个点的变化位置采用以下格式:
v = [starting_x, starting_y, distance_x, distance_y]
我(为了项目的缘故)必须创建一个与上述网格 (640, 640) 大小相同的矢量场。然而,给我的数据量只涵盖了该网格的几个点。因此 我迭代整个网格,计算每个单独的点与数组中存储的所有向量之间的距离 然后我按距离找到最接近的向量索引,并将该像素的值替换为该向量的 [dx, dy] 值。
我最初使用矩阵乘法和 Hadamard 乘积来实现此目的,但 最近数据量突然增加从 ~300 增加到超过 2000,导致大量内存溢出 ,因为我正在处理的矩阵大小几乎达到 13GB。 (为了正确减去所有点的坐标,我必须生成一个大小为 640x640x2000x2 的数组,其中包含每个点的坐标。) 考虑到我的项目将来应该在更大的数据集上运行,我已将初始方法更改为具有三个嵌套循环的简单迭代(是的,我知道这听起来有多可怕)。这样,通过一些优化和预先计算,我将同一数据集的主动使用数据量从 13GB 限制到最大 2.5GB。
然而,处理时间从之前的约 10 秒猛增到约 100 分钟,我不会真正考虑升级。
因此, 我需要找到一种以更高的效率处理大量数据的方法。
[编辑]: 这是当前形状的函数:
def spread_vectors_by_least_distance(mainvectors: np.array):
length = mainvectors.shape[0] # this is the amount of vectors we're working with
# Pre-calculating repetitive data:
mvectors_help = np.zeros((length, 3), np.uint16)
for z in range(length):
y_, x_ = mainvectors[z, :2]
mvectors_help[z] = (y_*y_ + x_*x_, 2*y_, 2*x_)
width, height = 640, 640
deltas = np.zeros((height, width, length), np.uint16)
for y in range(height):
ysqr = y*y
for x in range(width):
xsqr = x*x
xsqr_ysqr = ysqr + xsqr
delta_min = 999_999_999
index = 0
for z in range(length):
xy_sqr, y_dbl, x_dbl = mvectors_help[z]
# delta = (x-x_)^2 + (y-y_)^2, which simplifies to:
delta = xy_sqr + xsqr_ysqr - y*y_dbl - x*x_dbl
# We only need the smallest distance
if delta_min > delta:
delta_min = delta
index = z
# The point's value is the closest vector's value:
deltas[y, x] = mainvectors[index, 2:]
return deltas
我知道现在可以采取两 (2) 条路径来解决我的问题:
1。我可以找到一种通过 numpy 优化的方式迭代该数组的内置方法,执行大约 800_000_000 次代数运算和数组元素替换将花费合理的时间。 2。我可以使用 python-c 或 python-c++ 绑定
以更快的语言执行所有数据处理(根据我收集的数据,速度快十倍甚至一百倍),但这意味着要花费无数的时间学习Python的新模块(例如cython)并用我不太理解的语言调试代码。 因此,我向世界各地的所有Python大师寻求
寻找最简单的方法,但对我的问题来说是最有效的解决方案。 search of the simplest, yet most efficient solution to my problem.
可以通过使用 NumPy 的广播和向量化操作来显著提高代码速度,而不是使用循环。这就是一种方法:
import numpy as np
def spread_vectors_by_least_distance_optimized(mainvectors: np.array):
length = mainvectors.shape[0]
# 创建坐标网格
grid_x, grid_y = np.meshgrid(np.arange(640), np.arange(640))
# 将坐标重塑为 (640*640, 2) 数组
grid_points = np.stack((grid_x.ravel(), grid_y.ravel()), axis=1)
# 使用广播计算所有点和所有向量的距离
distances = np.sum((grid_points[:, np.newaxis, :] - mainvectors[:, :2])**2, axis=2)
# 找到每个网格点最近的向量的索引
closest_indices = np.argmin(distances, axis=1)
# 从 mainvectors 中获取最近向量的 dx, dy 值
deltas = mainvectors[closest_indices, 2:][:]
# 将结果重塑回 (640, 640, 2) 数组
return deltas.reshape(640, 640, 2)
解释:
-
创建坐标网格:
np.meshgrid
用于创建网格上所有点的坐标。 - 重塑坐标: 坐标被展平成一个 (640*640, 2) 的数组,以便进行广播。
-
计算距离:
利用广播,计算所有网格点与所有
mainvectors
之间的平方距离。 -
找到最近的索引:
np.argmin
用于找到distances
数组中沿轴 1(即对于每个网格点)的最小值的索引,该索引对应于最近的向量。 -
提取 dx, dy 值:
使用找到的索引从
mainvectors
中提取最近向量的dx, dy
值。 -
重塑结果:
最后,将
deltas
数组重塑回所需的 (640, 640, 2) 形状。
这段代码利用了 NumPy 的向量化操作,消除了 Python 循环并显著提高了性能。这种方法应该可以快速处理 2000 个向量,并且内存效率也更高,因为它避免创建大型中间数组。
标签:python,numpy,numpy-ndarray,numpy-slicing From: 78799631