首页 > 编程语言 >Python批量下载微信公众号内的文字和图片

Python批量下载微信公众号内的文字和图片

时间:2024-07-12 13:31:20浏览次数:9  
标签:key 批量 img Python 微信 self url file path

mport ctypes
import subprocess
import sys
import os
import random
import re
import uuid
import shutil
import datetime
import requests
import secrets
from bs4 import BeautifulSoup
from qiniu import Auth, put_file, BucketManager, urlsafe_base64_encode
import qiniu.config
import mimetypes
import pyperclip
from datetime import datetime as dt
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QPushButton, QHBoxLayout, QLineEdit, QTextEdit
from PyQt5.QtCore import Qt, QSettings, QSize, QPoint, pyqtSignal, QThread
from PyQt5.QtGui import QDragEnterEvent, QDropEvent
 
# 隐藏命令行窗口
def hide_console():
    hwnd = ctypes.windll.kernel32.GetConsoleWindow()
    if hwnd:
        ctypes.windll.user32.ShowWindow(hwnd, 0)
 
# 设置窗口位置和大小
def set_console_position_and_size(left, top, width, height):
    # 获取控制台窗口句柄
    hwnd = ctypes.windll.kernel32.GetConsoleWindow()
 
    if hwnd:
        # 调用 MoveWindow 函数来设置窗口位置和大小
        ctypes.windll.user32.MoveWindow(hwnd, left, top, width, height, True)
 
# 设置窗口位置 (左, 上) 和大小 (宽度, 高度)
left = -7
top = 1506
width = 335
height = 200
 
# 设置命令行窗口的大小和位置
set_console_position_and_size(left, top, width, height)
 
# 七牛云配置信息
access_key = '手动填写'
secret_key = '手动填写'
bucket_name = '手动填写'
domain = '手动填写'
pipeline = 'default.sys'
key = 'l/tupian/'
fops = 'imageView2/0/interlace/1/q/90|imageslim'
uploaded_images = {}  # 用于存储已经上传过的图片
 
q = Auth(access_key, secret_key)
bucket = BucketManager(q)
policy = {'persistentPipeline': pipeline}
 
if os.path.exists('.qiniu_pythonsdk_hostscache.json'):
    os.remove('.qiniu_pythonsdk_hostscache.json')
 
class FetchImagesThread(QThread):
    progress_signal = pyqtSignal(str)
 
    def __init__(self, url):
        super().__init__()
        self.url = url
 
    def run(self):
        self.fetch_images_from_url(self.url)
 
    def fetch_images_from_url(self, url):
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        content = soup.find('div', {'id': 'js_content'})
 
        if not os.path.exists('D:\\临时图片'):
            os.makedirs('D:\\临时图片')
 
        img_re = re.compile(r'<img [^<>]*src=["\']?([^"\'>]+)["\']?[^<>]*>')
        img_list = img_re.findall(str(content))
 
        bg_img_re = re.compile(r'background-image:\s*url\(["\']?([^"\'>]+)["\']?\)')
        bg_img_list = bg_img_re.findall(str(content))
 
        img_list.extend(bg_img_list)
 
        for index, img_url in enumerate(img_list):
            if img_url in uploaded_images:
                img_new_url = uploaded_images[img_url]
            else:
                response = requests.head(img_url)
                content_type = response.headers.get('content-type')
                img_type = content_type.split('/')[-1]
 
                if img_type == 'jpeg':
                    img_ext = '.jpg'
                elif img_type == 'png':
                    img_ext = '.png'
                elif img_type == 'gif':
                    img_ext = '.gif'
                elif img_type == 'svg+xml':
                    img_ext = '.svg'
                else:
                    img_ext = '.jpg'
 
                new_img_url = re.sub(r'\.\w+$', img_ext, img_url)
 
                img_data = requests.get(new_img_url).content
                img_name = f"D:\\临时图片\\{secrets.token_hex(4)}{img_ext}"
                with open(img_name, 'wb') as f:
                    f.write(img_data)
 
                today = datetime.datetime.now().strftime('%Y/%m-%d')
                title = url.split('/')[-1].split('.')[0]
                img_key = f"{key}{today}/{title}/{secrets.token_hex(4)}{img_ext}"
                token = q.upload_token(bucket_name, img_key)
                ret, info = put_file(token, img_key, img_name, check_crc=True)
 
                img_new_url = f"{domain}/{img_key}"
                uploaded_images[img_url] = img_new_url
 
            content = re.sub(r'src=["\']?' + re.escape(img_url) + r'["\']?', f'src="{img_new_url}"', str(content))
            content = re.sub(r'background-image:\s*url\(["\']?' + re.escape(img_url) + r'["\']?\)', f'background-image: url("{img_new_url}")', str(content))
 
            self.progress_signal.emit(f"替换后的链接: {img_new_url}")
 
        content = re.sub(r'data-src', 'src', content)
 
        timestamp = datetime.datetime.now().strftime('%H.%M.%S')
        file_name = f"C:/Users/Administrator/Desktop/采集_{timestamp}.html"
        with open(file_name, 'w', encoding='utf-8') as f:
            # 删除生成的无用代码段
            cleaned_content = re.sub(r'style="display: none;"', '', str(content))
            f.write(cleaned_content)
             
        self.progress_signal.emit(f"文件已保存至:{file_name}")
        shutil.rmtree('D:\\临时图片')
        self.progress_signal.emit("下载的文件已删除")
 
        if os.path.exists('.qiniu_pythonsdk_hostscache.json'):
            os.remove('.qiniu_pythonsdk_hostscache.json')
 
class DragDropWidget(QWidget):
    def __init__(self):
        super().__init__()
 
        self.init_ui()
        self.setAcceptDrops(True)
        self.last_file_is_video = False
 
        # 设置窗口位置到左下角
        screen = QApplication.primaryScreen().availableGeometry()
        self.move(screen.left(), screen.bottom() - self.height() - 276)
 
        # 读取上次保存的窗口位置和大小
        settings = QSettings('MyApp', 'DragDropWidget')
        self.resize(settings.value('size', QSize(400, 300)))
        self.move(settings.value('pos', QPoint(300, 300)))
 
    def init_ui(self):
        self.setWindowTitle('图片+视频+公众号采集')
        self.setGeometry(300, 300, 400, 300)  # 窗口大小设置为400x300
 
        layout = QVBoxLayout()
 
        self.url_input = QLineEdit(self)
        self.url_input.setPlaceholderText('请输入公众号文章的链接')
        layout.addWidget(self.url_input)
 
        self.fetch_button = QPushButton('点击采集', self)
        self.fetch_button.clicked.connect(self.fetch_images)
        layout.addWidget(self.fetch_button)
 
        self.label = QLabel('请拖拽文件或文件夹到此窗口进行上传', self)
        self.label.setAlignment(Qt.AlignCenter)
        layout.addWidget(self.label)
 
        self.result_label = QLabel('', self)
        self.result_label.setAlignment(Qt.AlignCenter)
        layout.addWidget(self.result_label)
 
        self.progress_text = QTextEdit(self)
        self.progress_text.setReadOnly(True)
        layout.addWidget(self.progress_text)
 
        self.size_label = QLabel('当前默认尺寸:宽1200px', self)
        self.size_label.setAlignment(Qt.AlignCenter)
        layout.addWidget(self.size_label)
 
        button_layout = QHBoxLayout()
        self.button_1200px = QPushButton('宽1200px', self)
        self.button_1200px.clicked.connect(lambda: self.set_size_option('1200px'))
        self.button_original = QPushButton('原始尺寸', self)
        self.button_original.clicked.connect(lambda: self.set_size_option('原始尺寸'))
        button_layout.addWidget(self.button_1200px)
        button_layout.addWidget(self.button_original)
 
        layout.addLayout(button_layout)
        self.setLayout(layout)
 
        self.size_option = '1200px'
 
    def fetch_images(self):
        url = self.url_input.text().strip()
        if url:
            self.progress_text.append("开始采集图片...")
            self.thread = FetchImagesThread(url)
            self.thread.progress_signal.connect(self.update_progress)
            self.thread.start()
 
    def set_size_option(self, option):
        self.size_option = option
        self.size_label.setText(f'当前选择:{option}')
        if option == '1200px':
            self.button_1200px.setStyleSheet('background-color: lightblue')
            self.button_original.setStyleSheet('')
        else:
            self.button_original.setStyleSheet('background-color: lightblue')
            self.button_1200px.setStyleSheet('')
 
    def dragEnterEvent(self, event):
        if event.mimeData().hasUrls():
            event.acceptProposedAction()
 
    def dropEvent(self, event):
        uploaded_files = []
        for url in event.mimeData().urls():
            file_path = url.toLocalFile()
            mime_type, _ = mimetypes.guess_type(file_path)
 
            if os.path.isdir(file_path):
                # 如果是文件夹,遍历文件夹内所有文件并上传
                for root, dirs, files in os.walk(file_path):
                    for file in files:
                        file_path = os.path.join(root, file)
                        result = self.upload_file_or_video(file_path)
                        if result:
                            uploaded_files.append(result)
            else:
                # 如果是文件,直接上传
                result = self.upload_file_or_video(file_path)
                if result:
                    uploaded_files.append(result)
 
        if uploaded_files:
            self.result_label.setText(f'上传成功,共上传 {len(uploaded_files)} 个文件')
            self.generate_html(uploaded_files)
        else:
            self.result_label.setText('上传失败,请重试')
 
    def upload_file_or_video(self, file_path):
        key_prefix = 'l/tupian/' if file_path.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')) else 'l/shipin/'
 
        key = key_prefix + dt.now().strftime('%Y/%m-%d') + '/' + str(random.randint(10000000, 99999999)) + os.path.basename(file_path)
        saveas_key = urlsafe_base64_encode(f'{bucket_name}:{key}')
 
        if key_prefix == 'l/tupian/':
            fops = 'imageView2/0/interlace/1/q/90|imageslim|saveas/' + saveas_key
        else:
            fops = 'vframe/jpg/offset/0.1/saveas/' + saveas_key
 
        policy['persistentOps'] = fops
        token = q.upload_token(bucket_name, key, 3600, policy=policy)
        ret, info = put_file(token, key, file_path, version='v2')
 
        if info.status_code == 200:
            self.update_progress(f'{key} {"图片" if key_prefix == "l/tupian/" else "视频"}上传成功')
            return f'{domain}/{key}'
 
        return None
 
    def generate_html(self, uploaded_files):
        desktop_path = os.path.join(os.path.expanduser("~"), "Desktop") 
        html_path = os.path.join(desktop_path, 'uploaded_files.html')
        with open(html_path, 'w', encoding='utf-8') as f:
            f.write('<style style="text-align: center;"></style><p>')
            for i, file in enumerate(uploaded_files):
                if file.endswith(('.jpg', '.jpeg', '.png', '.gif')):
                    if self.size_option == '1200px':
                        f.write(f'<img src="{file}" style="width: 1200px;"/>')
                    else:
                        f.write(f'<img src="{file}" style="width: auto;"/>')
                elif file.endswith(('.mp4', '.avi', '.mkv', '.mov')):
                    if not self.last_file_is_video:
                        f.write(f'<video poster="{file}?vframe/jpg/offset/0.1" src="{file}" controls="controls" style="width:100%; height:100%;" autoplay="autoplay" webkit-playsinline="true" playsinline="true" x5-playsinline="true"></video></p>')  
                    # Copy the video code to the clipboard
                    pyperclip.copy(f'<p><video poster="{file}?vframe/jpg/offset/0.1" src="{file}" controls="controls" style="width:100%; height:100%;" autoplay="autoplay" webkit-playsinline="true" playsinline="true" x5-playsinline="true"></video></p>')
                    self.last_file_is_video = True
 
        self.update_progress(f'已生成HTML文件:{html_path}')
        if os.path.exists('.qiniu_pythonsdk_hostscache.json'):
            os.remove('.qiniu_pythonsdk_hostscache.json')
 
    def update_progress(self, message):
        self.progress_text.append(message)
 
    def closeEvent(self, event):
        # 保存窗口位置和大小
        settings = QSettings('MyApp', 'DragDropWidget')
        settings.setValue('size', self.size())
        settings.setValue('pos', self.pos())
        event.accept()
 
if __name__ == '__main__':
    hide_console()
    app = QApplication(sys.argv)
    widget = DragDropWidget()
    widget.show()
    sys.exit(app.exec_())

标签:key,批量,img,Python,微信,self,url,file,path
From: https://blog.csdn.net/cc605523/article/details/140264335

相关文章

  • Python爬虫抓取笔趣阁小说(含源码)
    学习一下思路:1.我们进入需要爬取到的小说界面,右键开发者工具,选中元素显示,然后找到需要爬取的小说章节模块在代码中的位置。将a标签中的文本内容复制,然后ctrl+u打开源代码ctrl+f将刚刚的文本内容复制查找是否有这个模块。(比较爽的是,刚好这里有,可以不需要去查看网络请求......
  • python编程实例 计算输入内容中数字、字母、空格、其它字符的数量 两种方式实现
    第一种方式为通过python自带函数实现第二种方式为通过ascii码实现点击查看代码#字符串构成,统计出字符串中#空格英文字符数字其它字符的数量'''使用自带函数a=input("请输入:")kong=0ying=0shu=0qita=0foriinrange(len(a)):if(a[i].isspace()):kong......
  • 【PYTHON】运行环境配置之安装tar.gz压缩包
    我们经常会遇到从PYPI网站下载的安装文件没有whl格式,这时我们需要安装tar.gz格式的安装文件。比如图1所示的文件类型,这时我们需要先将该压缩包解压缩得到图2。图1图2接着,我们通过win+r,再输入cmd的方式得到运行窗口如图3。图3接着,我们利用pipinstall+解压后文件夹路......
  • python爬虫案例——5行代码爬取音乐资源
    都2024年了,不会还有人在花钱用VIP下载音乐吧~每天一个小妙招,教你5行代码轻松白嫖资源小白须知电脑需配备python解释器、安装一款编程软件,例如VisualStudioCode、pycharm等VisualStudioCode官网:https://code.visualstudio.com/pycharm社区版:https://www.jetbrains.com......
  • python-小理与他的画(赛氪OJ)
    [题目描述]小理是个画家,他希望有一天他的画能让心仪的她看到。只是后来她有了他,他却只有他的画,他望着他的画,默默的发呆。可惜做题的你,画不出他画的她,所以,我们只好画点简单的画,或许有一天,你就会遇到画里她/他吗?因此我们规定画画的内容:输入一个正整数 n(n为奇数),输出高度为 n......
  • “好物”推荐+Xshell连接实例+使用Conda创建独立的Python环境
    目录主题:好易智算平台推荐+RTX4090DGPU实例租用演示+安装配置torch1.9.1+cuda11.1.1环境引言:算力的新时代平台介绍:技术与信任的结晶使用案例:实际使用展示创建实例开始使用连接实例(下文演示使用Xshell连接,后续传文件也有配套的sftp可以使用)sftp传文件查看服务器配置,使用C......
  • 【Python】【银河麒麟】在命令行中将Python模块安装在指定项目的环境中
    银河麒麟的Pycharm有时候无法使用镜像源安装Python模块,这里记录一下在终端安装模块到指定项目环境的过程:1.找到项目的venu/bin文件夹,里面应该包含activate文件 2.在venu/bin文件夹下打开终端(命令行),或者打开终端后将路径切换到 venu/bin下;3.在命令行输入pipinstal......
  • PowerShell发送企业微信消息
     Import-ModuleMicrosoft.PowerShell.UtilityFunctionSendWechat($user,$days){$corpid="wechat"$secret="123-446"$agentid="1000000"$auth_sring="https://qyapi.weixin.qq.com/cgi-bin/gettoken?......
  • 获取微信小程序页面路径
    2024/07/121.步骤2.注意事项3.参考1.步骤微信公众号关联小程序时需要用到小程序的页面路径,获取步骤如下:'登录微信公众平台——工具——生成小程序码——获取更多页面路径——填写微信号(好像不能用手机号替代)——点击开启——进入小程序——右上角三个点——左下角“复制页......
  • 生成带logo二维码+批量压缩导出
    importcom.google.zxing.BarcodeFormat;importcom.google.zxing.EncodeHintType;importcom.google.zxing.MultiFormatWriter;importcom.google.zxing.common.BitMatrix;importcom.google.zxing.qrcode.decoder.ErrorCorrectionLevel;importjavax.imageio.ImageIO;......