目录
1. 代码
该 Python 脚本可多线程地批量下载新浪图床图片,每次下载会检查哪些图片已下载并过滤已下载的图片。
import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import time
from tqdm import tqdm
def setup_logger():
# 设置日志记录:同时输出到文件和控制台
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler('download.log')
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR)
# 日志格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
def download_image_by_change_referer(output_dir, url):
"""
下载图片
Args:
output_dir: 保存图片的文件夹
url: 图片链接
Returns: 下载成功返回True,否则返回False
"""
headers = {
'Referer': 'https://weibo.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
file_name = os.path.join(output_dir, url.split('/')[-1])
with open(file_name, 'wb') as f:
f.write(response.content)
logging.info(f'Successfully downloaded {file_name}')
return True
else:
logging.error(f'Failed to download {url}, status code: {response.status_code}')
return False
except requests.RequestException as e:
logging.error(f'Error downloading {url}: {e}')
return False
# time.sleep(0.1) # 限制请求频率,防止过快请求
def download_image_by_baidu_cache(output_dir, url):
"""
使用第三方缓存服务来解决防盗链问题
参考 https://code.newban.cn/466.html
Args:
output_dir: 保存图片的文件夹
url: 图片链接
Returns: 下载成功返回True,否则返回False
"""
headers = {
'Referer': 'https://image.baidu.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'
}
try:
response = requests.get(f'https://image.baidu.com/search/down?url={url}', headers=headers, timeout=10)
if response.status_code == 200:
file_name = os.path.join(output_dir, url.split('/')[-1])
with open(file_name, 'wb') as f:
f.write(response.content)
logging.info(f'Successfully downloaded {file_name}')
return True
else:
logging.error(f'Failed to download {url}, status code: {response.status_code}')
return False
except requests.RequestException as e:
logging.error(f'Error downloading {url}: {e}')
return False
def batch_download(output_dir, urls, method='baidu_cache', max_workers=10):
"""
批量下载图片
Args:
output_dir: 保存图片的文件夹
urls: 图片链接列表
max_workers: 最大线程数
method: 下载方法,可选值为'change_referer'或'baidu_cache'
Returns: 成功下载的图片数量和失败的图片数量
"""
if method == 'change_referer':
download_image_function = download_image_by_change_referer
elif method == 'baidu_cache':
download_image_function = download_image_by_baidu_cache
else:
raise ValueError(f'Invalid method: {method}')
success_count, failed_count = 0, 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(download_image_function, output_dir, url): url for url in urls}
for future in tqdm(as_completed(future_to_url), total=len(urls)):
url = future_to_url[future]
try:
result = future.result()
if result:
success_count += 1
else:
failed_count += 1
except Exception as e:
logging.error(f'Error processing {url}: {e}')
failed_count += 1
return success_count, failed_count
def get_all_image_urls(dataset_root_dir):
"""
读取所有图片链接
Args:
dataset_root_dir: 数据集根目录,包含img_url_dev.json, img_url_test.json, img_url_train.json
Returns: 图片链接列表
"""
files = [r'img_url_dev.json', r'img_url_test.json', r'img_url_train.json']
image_urls = set()
for idx, file in enumerate(files):
path = os.path.join(dataset_root_dir, file)
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
image_urls.update(line.strip().split(';'))
return image_urls
def get_downloaded_images(output_dir):
"""
读取已下载的图片(避免重复下载)
Returns: 已下载的图片集合
"""
downloaded_images = os.listdir(output_dir)
return downloaded_images
def main():
setup_logger()
config = dict(
dataset_root_dir=r'D:\Library\Datasets\20_MMChat',
output_dir='downloaded_images',
download_image_method='baidu_cache',
)
# 图片链接列表
image_urls = get_all_image_urls(config['dataset_root_dir'])
# 目标文件夹
output_dir = config['output_dir']
os.makedirs(output_dir, exist_ok=True)
# 过滤已下载的图片
downloaded_images = set(get_downloaded_images(output_dir))
image_urls_to_download = [url for url in image_urls if url.split('/')[-1] not in downloaded_images]
logging.info(f'Total images: {len(image_urls)}, images to download: {len(image_urls_to_download)}')
confirm = input(
f'共有 {len(image_urls)} 张图片,已过滤 {len(image_urls) - len(image_urls_to_download)} 张已下载的图片,待下载 {len(image_urls_to_download)} 张图片,确认下载?(y/n): ')
if confirm.lower() != 'y':
return
# 开始批量下载
success_count, failed_count = batch_download(output_dir, image_urls_to_download,
method=config['download_image_method'], max_workers=40)
logging.info(f'下载完成,Success: {success_count}, Failed: {failed_count}')
print(f'下载完成,Success: {success_count}, Failed: {failed_count}')
if __name__ == '__main__':
main()
2. 举一反三
该代码适用 MMChat 数据集,原理上支持所有新浪图床批量下载已失效的图片。
你需要修改 get_all_image_urls
方法来获取你想下载的所有图片的 URL 列表,该方法返回值:
[
"https://wx2.sinaimg.cn/mw2048/bc5ca296ly1fpt9oq74vsj20hs0npdqp.jpg",
"https://wx3.sinaimg.cn/mw2048/8bec28c2ly1fg1i9o0liqj20zk0qo7bj.jpg",
"https://wx4.sinaimg.cn/mw2048/954d55d0ly1fmwvia87e1j20ku11210q.jpg",
...
]
get_downloaded_images
方法返回已下载的图片,如果上面的三张图片已下载,那么该方法会返回
[
"bc5ca296ly1fpt9oq74vsj20hs0npdqp.jpg",
"8bec28c2ly1fg1i9o0liqj20zk0qo7bj.jpg",
"954d55d0ly1fmwvia87e1j20ku11210q.jpg",
]
最后,你可以修改图片的下载方式,目前支持 download_image_by_change_referer
、download_image_by_baidu_cache
。