首页 > 其他分享 >解决新浪微博图床 403 批量下载图片等资源(以 MMChat 数据集为例)

解决新浪微博图床 403 批量下载图片等资源(以 MMChat 数据集为例)

时间:2024-05-16 10:18:42浏览次数:26  
标签:MMChat url image 微博图 集为例 urls download output dir

目录


1. 代码

该 Python 脚本可多线程地批量下载新浪图床图片,每次下载会检查哪些图片已下载并过滤已下载的图片。

import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import time

from tqdm import tqdm


def setup_logger():
    # 设置日志记录:同时输出到文件和控制台
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # 文件处理器
    file_handler = logging.FileHandler('download.log')
    file_handler.setLevel(logging.INFO)

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.ERROR)

    # 日志格式
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # 添加处理器到logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)


def download_image_by_change_referer(output_dir, url):
    """
    下载图片
    Args:
        output_dir:     保存图片的文件夹
        url:            图片链接

    Returns:    下载成功返回True,否则返回False

    """
    headers = {
        'Referer': 'https://weibo.com/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            file_name = os.path.join(output_dir, url.split('/')[-1])
            with open(file_name, 'wb') as f:
                f.write(response.content)
            logging.info(f'Successfully downloaded {file_name}')
            return True
        else:
            logging.error(f'Failed to download {url}, status code: {response.status_code}')
            return False
    except requests.RequestException as e:
        logging.error(f'Error downloading {url}: {e}')
        return False
    # time.sleep(0.1)  # 限制请求频率,防止过快请求


def download_image_by_baidu_cache(output_dir, url):
    """
    使用第三方缓存服务来解决防盗链问题
    参考 https://code.newban.cn/466.html
    Args:
        output_dir:     保存图片的文件夹
        url:            图片链接

    Returns:            下载成功返回True,否则返回False

    """
    headers = {
        'Referer': 'https://image.baidu.com/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'
    }
    try:
        response = requests.get(f'https://image.baidu.com/search/down?url={url}', headers=headers, timeout=10)
        if response.status_code == 200:
            file_name = os.path.join(output_dir, url.split('/')[-1])
            with open(file_name, 'wb') as f:
                f.write(response.content)
            logging.info(f'Successfully downloaded {file_name}')
            return True
        else:
            logging.error(f'Failed to download {url}, status code: {response.status_code}')
            return False
    except requests.RequestException as e:
        logging.error(f'Error downloading {url}: {e}')
        return False


def batch_download(output_dir, urls, method='baidu_cache', max_workers=10):
    """
    批量下载图片
    Args:
        output_dir:     保存图片的文件夹
        urls:           图片链接列表
        max_workers:    最大线程数
        method:         下载方法,可选值为'change_referer'或'baidu_cache'

    Returns:            成功下载的图片数量和失败的图片数量

    """
    if method == 'change_referer':
        download_image_function = download_image_by_change_referer
    elif method == 'baidu_cache':
        download_image_function = download_image_by_baidu_cache
    else:
        raise ValueError(f'Invalid method: {method}')
    success_count, failed_count = 0, 0
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(download_image_function, output_dir, url): url for url in urls}
        for future in tqdm(as_completed(future_to_url), total=len(urls)):
            url = future_to_url[future]
            try:
                result = future.result()
                if result:
                    success_count += 1
                else:
                    failed_count += 1
            except Exception as e:
                logging.error(f'Error processing {url}: {e}')
                failed_count += 1
    return success_count, failed_count


def get_all_image_urls(dataset_root_dir):
    """
    读取所有图片链接
    Args:
        dataset_root_dir:   数据集根目录,包含img_url_dev.json, img_url_test.json, img_url_train.json
    Returns:    图片链接列表

    """
    files = [r'img_url_dev.json', r'img_url_test.json', r'img_url_train.json']
    image_urls = set()
    for idx, file in enumerate(files):
        path = os.path.join(dataset_root_dir, file)
        with open(path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        for line in lines:
            image_urls.update(line.strip().split(';'))
    return image_urls


def get_downloaded_images(output_dir):
    """
    读取已下载的图片(避免重复下载)
    Returns:    已下载的图片集合

    """
    downloaded_images = os.listdir(output_dir)
    return downloaded_images


def main():
    setup_logger()
    config = dict(
        dataset_root_dir=r'D:\Library\Datasets\20_MMChat',
        output_dir='downloaded_images',
        download_image_method='baidu_cache',
    )

    # 图片链接列表
    image_urls = get_all_image_urls(config['dataset_root_dir'])
    # 目标文件夹
    output_dir = config['output_dir']
    os.makedirs(output_dir, exist_ok=True)
    # 过滤已下载的图片
    downloaded_images = set(get_downloaded_images(output_dir))
    image_urls_to_download = [url for url in image_urls if url.split('/')[-1] not in downloaded_images]
    logging.info(f'Total images: {len(image_urls)}, images to download: {len(image_urls_to_download)}')
    confirm = input(
        f'共有 {len(image_urls)} 张图片,已过滤 {len(image_urls) - len(image_urls_to_download)} 张已下载的图片,待下载 {len(image_urls_to_download)} 张图片,确认下载?(y/n): ')
    if confirm.lower() != 'y':
        return

    # 开始批量下载
    success_count, failed_count = batch_download(output_dir, image_urls_to_download,
                                                 method=config['download_image_method'], max_workers=40)

    logging.info(f'下载完成,Success: {success_count}, Failed: {failed_count}')
    print(f'下载完成,Success: {success_count}, Failed: {failed_count}')


if __name__ == '__main__':
    main()

2. 举一反三

该代码适用 MMChat 数据集,原理上支持所有新浪图床批量下载已失效的图片。

你需要修改 get_all_image_urls 方法来获取你想下载的所有图片的 URL 列表,该方法返回值:

[
    "https://wx2.sinaimg.cn/mw2048/bc5ca296ly1fpt9oq74vsj20hs0npdqp.jpg",
    "https://wx3.sinaimg.cn/mw2048/8bec28c2ly1fg1i9o0liqj20zk0qo7bj.jpg",
    "https://wx4.sinaimg.cn/mw2048/954d55d0ly1fmwvia87e1j20ku11210q.jpg",
    ...
]

get_downloaded_images 方法返回已下载的图片,如果上面的三张图片已下载,那么该方法会返回

[
    "bc5ca296ly1fpt9oq74vsj20hs0npdqp.jpg",
    "8bec28c2ly1fg1i9o0liqj20zk0qo7bj.jpg",
    "954d55d0ly1fmwvia87e1j20ku11210q.jpg",
]

最后,你可以修改图片的下载方式,目前支持 download_image_by_change_refererdownload_image_by_baidu_cache

标签:MMChat,url,image,微博图,集为例,urls,download,output,dir
From: https://www.cnblogs.com/coderjiang/p/18195443

相关文章

  • 微博图床被废了,自己动手丰衣足食
    大家好,我是JavaPub。前言对于想我一样的MarkDown博主来说,经常会遇到的一个问题,那就是图片处理,本地图片怎么放到网上被大家访问?这是就用到了图床工具,将图床方到一个第三方互联网网站上,然后用外链访问。尝试过阿里云、腾讯云、七牛云这些厂商得一些免费域名或者存储额度都有时效性......
  • 使用Faraday库采集微博图片
    之前我们写过一个微博采集程序,不是特别难,那么有朋友想让我用Faraday库来写一个微博的爬虫程序,还要用Ruby来采集微博的图片。果然,不费吹灰之力,它来了,一起来学习一下吧。```rubyrequire'faraday'require'nokogiri'proxy_host='https://www.duoip.cn/get_proxy'proxy_port=8......
  • 贝叶斯网络python实战(以泰坦尼克号数据集为例,pgmpy库)
    贝叶斯网络python实战(以泰坦尼克号数据集为例,pgmpy库)leida_wt 2019-03-2423:05:36  16815  收藏 140分类专栏: 机器学习 文章标签: pgmpy 贝叶斯网络 泰坦尼克 机器学习 图网络版权 文章目录贝叶斯网络简介贝叶斯推断思路贝叶斯网络贝叶斯网络的实现应用步骤泰坦尼克......
  • 图像处理:人群计数中密度图的生成——以ShanghaiTechA数据集为例
    1.前言记录密度图的生成,防止以后找不到。代码也是从别人那得来的2.高斯核模糊在生成伪装密度图时,使用如下公式:\[F(x)=\sum^N_{i=1}\delta(x-x_i)*G_{\sigma}(x)......
  • PaddleNLP基于ERNIR3.0文本分类:WOS数据集为例(层次分类)
    相关项目链接:​​Paddlenlp之UIE模型实战实体抽取任务【打车数据、快递单】​​​​Paddlenlp之UIE分类模型【以情感倾向分析新闻分类为例】含智能标注方案)​​​​应用实践......