我在使用 ffmpeg python 处理相机帧时遇到问题。我使用 process.communicate() 的第一种方法效果很好,但存在延迟问题。
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.filter(<filter_params>)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate(input=img.tobytes())
output_image = np.frombuffer(out, np.uint8).reshape((height, width, channels))
为了减少延迟,我尝试保持 ffmpeg 进程打开并输入相机帧进行处理。这可以正常运行几分钟,延迟值可以接受,但最终会陷入死锁。解决这个问题的最佳方法是什么?
import cv2
import numpy as np
import math
import ffmpeg
def start_ffmpeg_process_async(width, height):
return (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.filter('<filter variables>')
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
def main():
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Camera can't open \nExiting...")
return -1
ffmpeg_process_async = start_ffmpeg_process_async(cap.get(cv2.CAP_PROP_FRAME_WIDTH),
cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
try:
while True:
success, img = cap.read()
if success:
height, width, channels = img.shape
ffmpeg_process_async.stdin.write(img.tobytes())
raw_output = ffmpeg_process_async.stdout.read(width * height * channels)
output_image = np.frombuffer(raw_output, np.uint8).reshape((height, width, channels))
cv2.imshow('Webcam', output_image)
else:
print ("Camera read error")
if cv2.waitKey(1) == ord('q'):
print ("Exiting . . .")
break
finally:
print ("Finalizing . . .")
ffmpeg_process_async.stdin.close()
ffmpeg_process_async.wait()
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
代码中出现的死锁问题是由于
ffmpeg_process_async.stdout.read(width * height * channels)
引起的。这个函数会尝试从 ffmpeg 进程的标准输出中读取指定字节数的数据,如果读取不到指定字节数,它会一直阻塞,直到读取到为止或者进程结束。
问题分析:
-
FFmpeg 输出不固定:
FFmpeg 滤镜处理后的视频帧大小可能不是始终等于
width * height * channels
。这会导致read()
函数一直等待接收足够的数据,从而造成阻塞。 - 循环读取: 代码在循环中不断读取 FFmpeg 的输出,而没有机制保证 FFmpeg 已经输出了足够的数据。
解决方案:
为了解决这个问题,我们需要确保读取 FFmpeg 输出的操作不会阻塞主线程。以下是一些可行的方案:
1. 使用非阻塞读取:
可以将
ffmpeg_process_async.stdout
设置为非阻塞模式,这样
read()
函数就不会阻塞。
```python import fcntl import os
# ...
flags = fcntl.fcntl(ffmpeg_process_async.stdout.fileno(), fcntl.F_GETFL) fcntl.fcntl(ffmpeg_process_async.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
# ...
raw_output = ffmpeg_process_async.stdout.read(width * height * channels) if raw_output: # 处理输出图像 ```
需要注意的是,非阻塞读取可能会返回空数据,需要添加判断逻辑来处理这种情况。
2. 使用队列:
可以使用队列来异步处理 FFmpeg 的输出。创建一个单独的线程用于读取 FFmpeg 的输出并将其放入队列中,主线程则从队列中获取数据。
```python import queue
# ...
output_queue = queue.Queue()
def read_output(process, queue): while True: data = process.stdout.read(1024) if not data: break queue.put(data)
# 创建读取线程 read_thread = threading.Thread(target=read_output, args=(ffmpeg_process_async, output_queue)) read_thread.start()
# ...
try: while True: # ... 处理图像 ...
try:
raw_output = output_queue.get_nowait() # 从队列中获取数据
# 处理输出图像
except queue.Empty:
pass # 处理队列为空的情况
# ... ```
3. 使用
subprocess.PIPE
和
poll()
:
可以使用 `subprocess.PIPE` 创建管道,并使用 `poll()` 方法检查是否有数据可读,避免阻塞。
```python
import subprocess
import time
# ...
ffmpeg_process_async = subprocess.Popen(
ffmpeg_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# ...
while True:
# ... 处理图像 ...
if ffmpeg_process_async.poll() is not None:
print("FFmpeg process exited unexpectedly")
break
if ffmpeg_process_async.stdout.readable():
raw_output = ffmpeg_process_async.stdout.read(width * height * channels)
# 处理输出图像
time.sleep(0.01) # 避免 CPU 占用过高
# ...
```
选择哪种方案取决于具体的需求和代码结构。建议先尝试使用非阻塞读取或队列,如果效果不理想,再考虑使用
subprocess.PIPE
和
poll()
。