import cv2
import os
import numpy as np
import argparse
import threading
import queue
import logging
from multiprocessing import Value
# 配置日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s === %(levelname)s === %(message)s')
def find_files(folder_path, file_queue, src_ext, finished_event):
"""
遍历文件夹及其子文件夹,将符合条件的文件路径放入队列。
"""
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(src_ext.lower()):
file_queue.put(os.path.join(root, file))
logging.info("文件搜索完成。")
finished_event.set() # 设置完成事件
def convert_file(file_queue, thread_id, processed_count, finished_event, dst_ext):
"""
从队列中获取文件路径,执行转换操作。
"""
while True:
try:
src_file_path = file_queue.get_nowait()
except queue.Empty:
# 检查是否完成事件被设置
if finished_event.is_set() and file_queue.empty():
logging.info(f"线程 {thread_id} 完成任务。")
break
continue # 如果队列为空且事件未设置,则继续循环
dst_file_path = os.path.splitext(src_file_path)[0] + dst_ext
try:
# 读取原始图像文件为二进制数据
with open(src_file_path, 'rb') as f:
image_data = f.read()
# 解码原始图像数据
image_array = np.frombuffer(image_data, np.uint8)
image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
# 检查图像是否成功加载
if image is not None:
# 编码为目标格式并保存
success, converted_data = cv2.imencode(dst_ext, image)
if success:
with open(dst_file_path, 'wb') as f:
f.write(converted_data)
logging.info(f"[线程 {thread_id}] 已将 {src_file_path} 转换为 {dst_file_path}")
os.remove(src_file_path)
with processed_count.get_lock():
processed_count.value += 1
else:
logging.warning(f"[线程 {thread_id}] 无法编码文件: {src_file_path}")
else:
logging.warning(f"[线程 {thread_id}] 无法读取文件: {src_file_path}")
except Exception as e:
logging.error(f"[线程 {thread_id}] 处理文件 {src_file_path} 时出错: {e}")
finally:
file_queue.task_done()
# 输出进度
# if processed_count.value % 10 == 0: # 每处理10个文件输出一次
logging.info(f"[线程 {thread_id}] 已处理 {processed_count.value} 个文件,队列中剩余 {file_queue.qsize()} 个文件。")
def main():
# 设置命令行参数解析
parser = argparse.ArgumentParser(description="将文件转换为目标格式。")
parser.add_argument('--folder_path', default=r"***", type=str, help='要转换的文件夹路径')
parser.add_argument('--src_ext', default='.png', type=str, help='源文件后缀')
parser.add_argument('--dst_ext', default='.jpg', type=str, help='目标文件后缀')
args = parser.parse_args()
logging.info(f"处理路径: {args.folder_path}, 源后缀: {args.src_ext}, 目标后缀: {args.dst_ext}")
# 创建一个队列用于存放文件路径
file_queue = queue.Queue()
# 创建一个共享计数器,用于记录已处理文件数量
processed_count = Value('i', 0)
# 创建一个事件对象,用于表示搜索线程完成
finished_event = threading.Event()
# 创建并启动线程来查找文件
finder_thread = threading.Thread(target=find_files, args=(args.folder_path, file_queue, args.src_ext, finished_event), daemon=True)
finder_thread.start()
# 创建并启动多个线程来转换文件
num_worker_threads = 8
worker_threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=convert_file, args=(file_queue, i + 1, processed_count, finished_event, args.dst_ext), daemon=True)
t.start()
worker_threads.append(t)
# 等待文件查找线程完成
finder_thread.join()
# 等待队列中的所有任务完成
file_queue.join()
# 输出处理结果
logging.info(f"总共处理了 {processed_count.value} 个文件。")
# 等待所有工作线程完成
for t in worker_threads:
t.join()
logging.info("所有文件转换完成。")
if __name__ == "__main__":
main()
这段代码实现了一个多线程的文件转换程序,主要将指定文件夹中的图像文件从一种格式(如 .png
)转换为另一种格式(如 .jpg
)。它使用 os
遍历文件夹,queue
管理待处理文件,threading
启动多个线程进行文件转换,并利用 logging
记录处理进度。
主要功能包括:
- 文件搜索:
find_files
函数遍历指定路径,找到符合扩展名的文件,并将其路径放入队列。 - 文件转换:
convert_file
函数从队列中获取文件路径,读取图像数据并进行格式转换,最后保存为新文件,并记录处理状态。 - 主函数:设置命令行参数,初始化队列和计数器,启动文件搜索和转换线程,并在完成后输出处理结果。