首页 > 编程语言 >微信公众号_爬虫_fiddler_抓包_python

微信公众号_爬虫_fiddler_抓包_python

时间:2023-07-02 23:23:11浏览次数:85  
标签:fiddler get python 微信 self db user msg id

wechat_python/run.py

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class ArticleInfo(Base):
    __tablename__ = 'article_info'

    id = Column(Integer, primary_key=True)
    type = Column(Integer)
    datetime = Column(Integer)
    fakeid = Column(String(255))
    status = Column(Integer)
    content = Column(String(255))
    title = Column(String(255))
    digest = Column(String(255))
    fileid = Column(Integer)
    content_url = Column(String(255))
    source_url = Column(String(255))
    cover = Column(String(255))
    subtype = Column(Integer)
    is_multi = Column(Integer)
    author = Column(String(255))
    copyright_stat = Column(Integer)
    duration = Column(Integer)
    del_flag = Column(Integer)
    item_show_type = Column(Integer)
    audio_fileid = Column(Integer)
    play_url = Column(String(255))
    malicious_title_reason_id = Column(Integer)
    malicious_content_type = Column(Integer)

# 初始化数据库连接
engine = create_engine('数据库连接字符串')
# 创建 DBSession 类型
DBSession = sessionmaker(bind=engine)

# 创建表
Base.metadata.create_all(engine)

# 创建 session 对象
session = DBSession()

# 创建新的 ArticleInfo 对象
new_article_info = ArticleInfo(
        id=1000001146,
        type=49,
        datetime=1685929036,
        fakeid="3553419583",
        status=2,
        content="",
        title="自由速度 | 第二十四期:為賤民寫詩,我們都愛的圖卡拉姆",
        digest="自由速度Tempo Rubato,唱出靈魂的旋律,以專屬於你的自由速度。\t聊正在翻譯的書、終於印好的書、聊詩",
        fileid=0,
        content_url="http://mp.weixin.qq.com/s?__biz=MzU1MzQxOTU4Mw==&mid=2247492333&idx=1&sn=798a9b4bccff988fa186e32f0c7f0100&chksm=adcd9671ea93b00c2c59f8edfd2c1e36656be92220de174f9233450ac3bdbcc5ad14fcb8bff3&scene=27#wechat_redirect",
        source_url="",
        cover="https://mmbiz.qpic.cn/mmbiz_jpg/vAKkjup1cHHoHGZ1YpyLMBbYWSTicRTYDgAvmT9Uxkxs5atymvIttUIf48iatUryefCvvyGRpcwSKIeLH33aDUMA/0?wx_fmt=jpeg",
        subtype=9,
        is_multi=0,
        author="灵智宝鬘",
        copyright_stat=201,
        duration=0,
        del_flag=1,
        item_show_type=0,
        audio_fileid=0,
        play_url="",
        malicious_title_reason_id=0,
        malicious_content_type=0
)

# 添加到 session
session.add(new_article_info)

# 提交即保存到数据库
session.commit()

# 关闭 session
session.close()


wechat_python/app.py

# -*- coding: UTF-8 -*-
import json
import requests

url = 'https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz=MzU1MzQxOTU4Mw==&f=json&offset=10&count=10&is_ok=1&scene=124&uin=NDA4Mzg1NTk1&key=05f982ebd31a295d7f06c6f8740e78fac336d2374b0fc6eaf55e88cb7117e9d1c1e643ecff8baa5939b4db06a5136bdfdd69124faf76468d46cb62656e70a7d1d10787b28d94409acfe0f1b54662a0f841d48774691df99c68114de0e8b72941af46d9d0410d07f3906021f4595b3be758bbb85914dbc9ba2ae03c3ea9249dba&pass_ticket=jyt6bvMlhDNP1DG0u0Pi4M5rLkPI8AmLNcKrzeckfLllmrFkmmXpIXcTz1tZjq3A&wxtoken=&appmsg_token=1224_n0SizMWgDDIEsfJ7kQmon6r8wC9xDLc8AhNb3g~~&x5=0&f=json'
headers = {
    'Host': 'mp.weixin.qq.com', 
           'Connection': 'keep-alive', 
           'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.5 WindowsWechat', 'X-Requested-With': 'XMLHttpRequest', 'Accept': '*/*', 'Referer': 'https://mp.weixin.qq.com/mp/profile_ext?action=home&__biz=MzU1MzQxOTU4Mw==&scene=124&uin=NDA4Mzg1NTk1&key=05f982ebd31a295d7f06c6f8740e78fac336d2374b0fc6eaf55e88cb7117e9d1c1e643ecff8baa5939b4db06a5136bdfdd69124faf76468d46cb62656e70a7d1d10787b28d94409acfe0f1b54662a0f841d48774691df99c68114de0e8b72941af46d9d0410d07f3906021f4595b3be758bbb85914dbc9ba2ae03c3ea9249dba&devicetype=Windows+10+x64&version=62090538&lang=zh_CN&a8scene=7&acctmode=0&pass_ticket=jyt6bvMlhDNP1DG0u0Pi4M5rLkPI8AmLNcKrzeckfLllmrFkmmXpIXcTz1tZjq3A&winzoom=1', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4'}
cookies = {
    'wxuin': '408385595', 
    'devicetype': 'android-33', 
    'version': '2800205d', 
    'lang': 'zh_CN', 
    'pass_ticket': 'jyt6bvMlhDNP1DG0u0Pi4M5rLkPI8AmLNcKrzeckfLllmrFkmmXpIXcTz1tZjq3A', 
    'wap_sid2': 'CLvw3cIBEooBeV9ISW8zQU9YN3BrRXo5dXA0WlVFOUdGd2RBRlBleFlmU3hUNHNnekREZXAzOGpDOGRNSktqOWF3TWt3RDNKckNXRm5QMk1SYnlGeGpfa1pFQy12dXhrWGJFX01rdzk2aFp4UV9ZS3dIa2YtNjRRbDRyNHRwMm9QWUxITW5NY1R5UEthWVNBQUF+MP/FhKUGOA1AlU4'
}

data = {}

html = requests.get(url, headers=headers, verify=False, cookies=cookies)
print(html.text)
res_dict = json.loads(html.text)
msg_list = res_dict.get("general_msg_list").get('list')

for msg in msg_list:
    print(msg.get('comm_msg_info').get('id'))
    print(msg.get('comm_msg_info').get('datetime'))
    print(msg.get('app_msg_ext_info').get('title'))
    print(msg.get('app_msg_ext_info').get('digest'))
    print(msg.get('app_msg_ext_info').get('content_url'))
    print(msg.get('app_msg_ext_info').get('cover'))
    print(msg.get('app_msg_ext_info').get('multi_app_msg_item_list'))
    print('-----------------')





wechat_python/src/main.py

# -*- coding: UTF-8 -*-
import asyncio
import json
from models.article import ArticleInfo
from schemas.article import CreateArticle
from services import article_service
from db.sqlite import create_table



async def async_function():
    await create_table()
    with open('res.json','r',encoding='utf-8') as f:
        cont = f.read()

    cont_dict = json.loads(cont)
    app_msg_list = json.loads(cont_dict.get('general_msg_list')).get('list')

    for msg in app_msg_list:
        print(msg.get('app_msg_ext_info').get('title'))
        article = CreateArticle(
            msg_id=msg.get('comm_msg_info').get('id'),
            type=msg.get('comm_msg_info').get('type'),
            datetime=msg.get('comm_msg_info').get('datetime'),
            fakeid=msg.get('comm_msg_info').get('fakeid'),
            status=msg.get('comm_msg_info').get('status'),
            content=msg.get('comm_msg_info').get('content'),
            title =msg.get('app_msg_ext_info').get('title'),
            digest=msg.get('app_msg_ext_info').get('digest'),
            fileid=msg.get('app_msg_ext_info').get('fileid'),
            content_url=msg.get('app_msg_ext_info').get('content_url'),
            source_url=msg.get('app_msg_ext_info').get('source_url'),
            cover=msg.get('app_msg_ext_info').get('cover'),
            subtype=msg.get('app_msg_ext_info').get('subtype'),
            is_multi=msg.get('app_msg_ext_info').get('is_multi'),
            author=msg.get('app_msg_ext_info').get('author'),
            copyright_stat=msg.get('app_msg_ext_info').get('copyright_stat'),
            duration=msg.get('app_msg_ext_info').get('duration'),
            del_flag=msg.get('app_msg_ext_info').get('del_flag'),
            item_show_type=msg.get('app_msg_ext_info').get('item_show_type'),
            audio_fileid=msg.get('app_msg_ext_info').get('audio_fileid'),
            play_url=msg.get('app_msg_ext_info').get('play_url'),
            malicious_title_reason_id=msg.get('app_msg_ext_info').get('malicious_title_reason_id'),
            malicious_content_type=msg.get('app_msg_ext_info').get('malicious_content_type'),
        )
        await article_service.create_wechat_article(article)

    


if __name__ == '__main__':
    asyncio.run(async_function())

wechat_python/src/core/conf.py

import os
import secrets

from pathlib import Path



class ST:
    # 获取项目根目录
    # 或使用绝对路径,指到backend目录为止
    BASE_PATH = Path(__file__).resolve().parent.parent.parent

    # 日志文件路径
    LOG_PATH = os.path.join(BASE_PATH,  'log')

    # sqlite数据库文件路径
    SQLITE_PATH = os.path.join(BASE_PATH, 'sqlite_db')








wechat_python/src/models/__init__.py

from db.sqlite import MappedBase

from .article import ArticleInfo



wechat_python/src/models/article.py

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from . import MappedBase

class ArticleInfo(MappedBase):
    __tablename__ = 'article_info'


    id = Column(Integer,primary_key=True,autoincrement=True)
    datetime = Column(Integer)
    msg_id = Column(Integer)
    type = Column(Integer)
    fakeid = Column(String(255))
    status = Column(Integer)
    content = Column(String(255))
    title = Column(String(255))
    digest = Column(String(255))
    fileid = Column(Integer)
    content_url = Column(String(255))
    source_url = Column(String(255))
    cover = Column(String(255))
    subtype = Column(Integer)
    is_multi = Column(Integer)
    author = Column(String(255))
    copyright_stat = Column(Integer)
    duration = Column(Integer)
    del_flag = Column(Integer)
    item_show_type = Column(Integer)
    audio_fileid = Column(Integer)
    play_url = Column(String(255))
    malicious_title_reason_id = Column(Integer)
    malicious_content_type = Column(Integer)



# # 创建新的 ArticleInfo 对象
# new_article_info = ArticleInfo(
#         id=1000001146,
#         type=49,
#         datetime=1685929036,
#         fakeid="3553419583",
#         status=2,
#         content="",
#         title="自由速度 | 第二十四期:為賤民寫詩,我們都愛的圖卡拉姆",
#         digest="自由速度Tempo Rubato,唱出靈魂的旋律,以專屬於你的自由速度。\t聊正在翻譯的書、終於印好的書、聊詩",
#         fileid=0,
#         content_url="http://mp.weixin.qq.com/s?__biz=MzU1MzQxOTU4Mw==&mid=2247492333&idx=1&sn=798a9b4bccff988fa186e32f0c7f0100&chksm=adcd9671ea93b00c2c59f8edfd2c1e36656be92220de174f9233450ac3bdbcc5ad14fcb8bff3&scene=27#wechat_redirect",
#         source_url="",
#         cover="https://mmbiz.qpic.cn/mmbiz_jpg/vAKkjup1cHHoHGZ1YpyLMBbYWSTicRTYDgAvmT9Uxkxs5atymvIttUIf48iatUryefCvvyGRpcwSKIeLH33aDUMA/0?wx_fmt=jpeg",
#         subtype=9,
#         is_multi=0,
#         author="灵智宝鬘",
#         copyright_stat=201,
#         duration=0,
#         del_flag=1,
#         item_show_type=0,
#         audio_fileid=0,
#         play_url="",
#         malicious_title_reason_id=0,
#         malicious_content_type=0
# )

wechat_python/src/schemas/article.py

# -*- coding: utf-8 -*-
from typing import Optional, List
from pydantic import BaseModel, Field, EmailStr
from datetime import datetime,timedelta,timezone


class CreateArticle(BaseModel):
    msg_id:int
    type:int
    datetime:int
    fakeid:str
    status:int
    content:str
    title:str
    digest:str
    fileid:int
    content_url:str
    source_url:str
    cover:str
    subtype:int
    is_multi:int
    author:str
    copyright_stat:int
    duration:int
    del_flag:int
    item_show_type:int
    audio_fileid:int
    play_url:str
    malicious_title_reason_id:int
    malicious_content_type:int

    

    class Config:
        json_schema_extra = {
            "example": {
               "dev_id": 1,
               "station_id":1 ,
               "created_at":1676886307 ,
               "name":'str(1676886307).jpg' 
            }
        }


wechat_python/src/common/logger.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os

from pathlib import Path
from loguru import logger

from core.conf import ST






class Logger:

    @staticmethod
    def log() -> logger:
        if not os.path.exists(ST.LOG_PATH):
            os.mkdir(ST.LOG_PATH)


        # 日志文件
        log_file = os.path.join(ST.LOG_PATH, "FastBlog.log")

        # loguru日志
        # more: https://github.com/Delgan/loguru#ready-to-use-out-of-the-box-without-boilerplate
        logger.add(
            log_file,
            encoding='utf-8',
            level="DEBUG",
            rotation='00:00',  # 每天 0 点创建一个新日志文件
            retention="7 days",  # 定时自动清理文件
            enqueue=True,  # 异步安全
            backtrace=True,  # 错误跟踪
            diagnose=True,
        )

        return logger


logger = Logger().log()

wechat_python/src/crud/article_crud.py

# -*- coding: utf-8 -*-
import time
from typing import List, NoReturn

from sqlalchemy import  select, update,insert 
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload, noload
from sqlalchemy.sql import Select,text

from sqlalchemy import func
from datetime import date, datetime, timedelta
from models.article import ArticleInfo
from schemas.article import CreateArticle
 
async def create_article(db: AsyncSession, obj_in: CreateArticle) -> NoReturn:
    db_obj = ArticleInfo(**obj_in.model_dump())
    db.add(db_obj)
    await db.commit()

async def get_article_by_datetime(db: AsyncSession, datetime: int) -> ArticleInfo:
    user = await db.execute(
        select(ArticleInfo)
        .where(ArticleInfo.datetime == datetime)
    )
    return user.scalars().first()



    # async def get_user_by_id(self, db: AsyncSession, user_id: int) -> User:
    #     return await self.get(db, user_id)

    # async def get_user_roles(self, db: AsyncSession, user_id: int) -> list:
    #     se = await db.execute(
    #         select(self.model)
    #         .where(self.model.id == user_id)
    #         .options(selectinload(self.model.roles))
    #     )
    #     role_ids = [r.id for r in se.scalars().first().roles]
    #     return role_ids

    # async def get_userinfo_by_id(self, db: AsyncSession, user_id: int) -> User:
    #     user = await db.execute(
    #         select(self.model)
    #         .options(selectinload(self.model.department))
    #         .options(noload(self.model.roles))
    #         .where(self.model.id == user_id)
    #     )
    #     return user.scalars().first()


    # res = await db.execute(insert(self.model).values(obj_in.dict()))


    # async def get_img_by_timestamp(self, db: AsyncSession, timestamp: int) -> Img:
    #     img = await db.execute(select(self.model).where(self.model.created_at ==datetime.fromtimestamp(timestamp) ))
    #     return img.scalars().first()
    
    # async def get_imgs_by_dev_id(self, db: AsyncSession, id: int) -> List[Img]:
    #     img = await db.execute(select(self.model).where(self.model.dev_id == id ))
    #     return img.scalars().all()






    # async def get_imgs_group_by_station_id(self, db: AsyncSession,content_type:str) -> List[Img]:
    #     if content_type == 'temp':
    #         textual_sql = text("""SELECT station.dev_id,station_id,content_temperature,max(datetime(created_at)),x,y FROM img INNER JOIN station WHERE img.station_id = station.id GROUP BY station_id;""")
    #         # textual_sql = textual_sql.columns(Station.dev_id,self.model.station_id,self.model.content_temperature,self.model.created_at,Station.x,Station.y)
    #     else:
    #         textual_sql = text("""SELECT station.dev_id,station_id,content_humidity,max(datetime(created_at)),x,y FROM img INNER JOIN station WHERE img.station_id = station.id GROUP BY station_id;""")
    #     res = await db.execute(textual_sql)

    #     #  将查询的结果转换成字典形式
    #     keys =list(res.keys())
    #     keys[3] = 'created_at'
    #     data = [dict(zip(keys, re)) for re in res]
    #     return data 


        # img = await db.execute(select(self.model.id,self.model.station_id,self.model.created_at).group_by(self.model.station_id,self.model.created_at))
        # first_date = datetime.now() - timedelta(days=1)
        # first_date = datetime.now() - timedelta(days=10)
        # last_date = datetime.now()
        # img = await db.execute((select(self.model.dev_id).where(self.model.created_at.between(first_date,last_date)).order_by(self.model.created_at)).group_by(self.model.station_id))
        # res = await db.execute(select(self.model.station_id).group_by(self.model.station_id))
        # res = await db.execute(text("select * from img"))



        #  获取一段时间的图片,返回一个list
        # first_date = datetime(2023, 2, 21,11,8,0)
        # last_date = datetime(2023, 2, 21,11,9,0)
        # img = await db.execute(select(self.model).where(self.model.created_at.between(first_date,last_date)  ))
        # return img.scalars().all()
 


    # async def get_station_by_id(self, db: AsyncSession, id: int) ->Station:
    #     db_obj = await db.execute(select(self.model).where(self.model.id == id))
    #     return db_obj.scalars().first()



    # async def get_user_by_username_with_roles(self, db: AsyncSession, username: str) -> User:
    #     user = await db.execute(
    #         select(self.model)
    #         .options(selectinload(self.model.roles))
    #         .where(self.model.username == username)
    #     )
    #     return user.scalars().first()

    # async def update_user_login_time(self, db: AsyncSession, username: str) -> int:
    #     user = await db.execute(
    #         update(self.model)
    #         .where(self.model.username == username)
    #         .values(last_login=func.now())
    #     )
    #     return user.rowcount

    # async def get_email_by_username(self, db: AsyncSession, username: str) -> str:
    #     user = await self.get_user_by_username(db, username)
    #     return user.email

    # async def get_username_by_email(self, db: AsyncSession, email: str) -> str:
    #     user = await db.execute(select(self.model).where(self.model.email == email))
    #     return user.scalars().first().username

    # async def get_avatar_by_username(self, db: AsyncSession, username: str) -> str:
    #     user = await self.get_user_by_username(db, username)
    #     return user.avatar

    # async def create_user(self, db: AsyncSession, user: CreateUser) -> NoReturn:
    #     user.password = jwt.get_hash_password(user.password)
    #     new_user = self.model(**user.dict(exclude={'role_id'}))
    #     roles_list = []
    #     for i in user.role_id:
    #         roles_list.append(await db.get(Role, i))
    #     new_user.roles = roles_list
    #     db.add(new_user)

    # async def update_userinfo(self, db: AsyncSession, current_user: User, obj: UpdateUser) -> int:
    #     dept = await db.get(Department, obj.department_id)
    #     obj.department_id = dept.id
    #     u = await db.execute(
    #         update(self.model)
    #         .where(self.model.id == current_user.id)
    #         .values(**obj.dict(exclude={'role_id'}))
    #     )
    #     # step1 删除所有角色
    #     for i in list(current_user.roles):
    #         current_user.roles.remove(i)
    #     # step2 添加新的角色
    #     role_list = []
    #     for i in obj.role_id:
    #         role_list.append(await db.get(Role, i))
    #     current_user.roles = role_list
    #     return u.rowcount

    # async def update_avatar(self, db: AsyncSession, current_user: User, avatar: str) -> int:
    #     return await self.update(db, current_user.id, {'avatar': avatar})

    # async def delete_user(self, db: AsyncSession, user_id: int) -> int:
    #     return await self.delete(db, user_id)

    # async def check_email(self, db: AsyncSession, email: str) -> User:
    #     mail = await db.execute(select(self.model).where(self.model.email == email))
    #     return mail.scalars().first()

    # async def delete_avatar(self, db: AsyncSession, user_id: int) -> int:
    #     return await self.update(db, user_id, {'avatar': None})

    # async def reset_password(self, db: AsyncSession, username: str, password: str) -> int:
    #     user = await db.execute(
    #         update(self.model)
    #         .where(self.model.username == username)
    #         .values(password=jwt.get_hash_password(password))
    #     )
    #     return user.rowcount

    # def get_users(self) -> Select:
    #     return select(self.model)\
    #         .order_by(self.model.time_joined.desc())\
    #         .options(selectinload(self.model.department))\
    #         .options(noload(self.model.roles))

    # async def get_user_is_super(self, db: AsyncSession, user_id: int) -> bool:
    #     user = await self.get_user_by_id(db, user_id)
    #     return user.is_superuser

    # async def get_user_is_active(self, db: AsyncSession, user_id: int) -> bool:
    #     user = await self.get_user_by_id(db, user_id)
    #     return user.is_active

    # async def super_set(self, db: AsyncSession, user_id: int) -> int:
    #     super_status = await self.get_user_is_super(db, user_id)
    #     user = await db.execute(
    #         update(User)
    #         .where(User.id == user_id)
    #         .values(is_superuser=False if super_status else True)
    #     )
    #     return user.rowcount


    # async def active_set(self, db: AsyncSession, user_id: int) -> int:
    #     active_status = await self.get_user_is_active(db, user_id)
    #     user = await db.execute(
    #         update(User)
    #         .where(User.id == user_id)
    #         .values(is_active=False if active_status else True)
    #     )
    #     return user.rowcount



wechat_python/src/db/sqlite.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from contextlib import asynccontextmanager
import sys

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from common.logger import logger
# from model  import MappedBase 

from core.conf import ST

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import DeclarativeBase,declared_attr





""" 
说明:SqlAlchemy
"""

SQLALCHEMY_DATABASE_URL = f'sqlite+aiosqlite:///{ST.SQLITE_PATH}/wechat.sqlite'

try:
    # 数据库引擎
    async_engine = create_async_engine(SQLALCHEMY_DATABASE_URL, future=True)
    # log.success('数据库连接成功')
except Exception as e:
    logger.error('❌ 数据库链接失败 {}', e)
    sys.exit()
else:
    async_db_session = async_sessionmaker(bind=async_engine, autoflush=False, expire_on_commit=False)



MappedBase = declarative_base()


@asynccontextmanager
async def get_async_session() -> AsyncSession:
    """
    session 生成器

    :return:
    """
    session = async_db_session()
    try:
        yield session
    except Exception as se:
        await session.rollback()
        raise se
    finally:
        await session.close()



def async_session(func):
    async def wrapper(*args, **kwargs):
        async with get_async_session() as session:
            return await func(session, *args, **kwargs)
    return wrapper






async def create_table():
    """
    创建数据库表
    """
    async with async_engine.begin() as coon:
        await coon.run_sync(MappedBase.metadata.create_all)















wechat_python/src/services/article_service.py

from db.sqlite import async_session 
from sqlalchemy.ext.asyncio import AsyncSession
from schemas.article import CreateArticle

from crud import article_crud


@async_session
async def create_wechat_article(db:AsyncSession,article:CreateArticle):
    res = await article_crud.get_article_by_datetime(db,article.datetime)
    if res:
        return
    await article_crud.create_article(db,article) 



标签:fiddler,get,python,微信,self,db,user,msg,id
From: https://www.cnblogs.com/zhuoss/p/17521662.html

相关文章

  • python: Enum
     #encoding:utf-8#Author:geovindu,GeovinDu涂聚文.#IDE:PyCharm2023.1python11#Datetime:2023/7/221:18#User:geovindu#Product:PyCharm#Project:pythonStudyDemo#File:CheckSort.py#explain:学习frome......
  • python之numpy模块
    1NumPy是什么?NumPy(全称:NumericPython)是python的第三方模块,主要用于计算、处理一维或多维数组。Numpy通常与Scipy(Python科学计算库),Matplotlib(Python绘图库),Pandas(Python数据处理)等组合使用,这样可以广泛的代替Matlab的使用。2为什么使用NumPy?Python中没有内置数组(array)类......
  • 【Python自制工具软件】批量图片转PDF小工具——PIC2PDF
    楔子大家在工作当中总会冒出各种各样的需求,尤其当面对繁琐的工作时。“如果有那样一款想象中的工具就好了!”可以瞬间解决手头工作的想象中的工具,是否存在呢?当然,然而获得它总是需要我们花费大量的时间去筛选甄别,有的充斥大量广告,有的则需要付出不菲的费用。其实完全可以自己上......
  • Python | 文件处理
    文件的读写文件对象在python中用open()可以创建一个文件对象。open()使用方法:open(file,mode='r',buffering=-1,encoding=None,errors=None,newline=None,closefd=True,opener=None)参数说明:file:必需,文件路径(相对或者绝对路径)。mode:可选,文件打开模式(常用)buf......
  • Google Colab:云端的Python编程神器
    GoogleColab,全名GoogleColaboratory,是GoogleResearch团队开发的一款云端编程工具,它允许任何人通过浏览器编写和执行Python代码。Colab尤其适合机器学习、数据分析和教育目的。它是一种托管式Jupyter笔记本服务,用户无需设置,就可以直接使用,同时还能获得GPU等计算资源的免费使用......
  • Python | with关键字详解
    with使用背景对于系统资源如文件、数据库连接、socket而言,应用程序打开这些资源并执行完业务逻辑之后,必须做的一件事就是要关闭(释放)该资源。比如Python程序打开一个文件,往文件中写内容,写完之后,就要关闭该文件,如果不关闭会出现什么情况呢?极端情况下会出现Toomanyopenfiles......
  • python: multiple inheritance
    多继承"""Mother.py多继承类inheritedAnimalpython(类名)superjava继承可以使用extends和implements这两个关键字来实现继承C++:public类名C#:类名可以重写父类方法edit:geovindu,GeovinDudate:20230702IDE:PyCharm2023.1.2"""importsysimportosi......
  • python中globals()的用法
    python中globals()的用法 1.获取所有的全局变量,获取到的内容如下:{'__name__':'__main__','__doc__':None,'__package__':None,'__loader__':<_frozen_importlib_external.SourceFileLoaderobjectat0x7efc4bd1d960>,�......
  • Python入门
    一、逻辑运算符的一些记录#非布尔值的与或运算#当我们对非布尔值进行与或运算时,Python会将其当做布尔值运算,最终会返回原值#与运算的规则#与运算是找False的,如果第一个值是False,则不看第二个值#如果第一个值是False,则直接返回第一个值,否则返回第二个值#或......
  • Python - 编写Unicode 字符串
    Python的字符串字面量支持"\xNN"十六进制字节值转义以及"\uNNNN"和"\UNNNNNNNN"Unicode转义。第一种形式用4位十六进制数编码2字节(16位)位字符码点第二种形式用8位十六进制数编码4字节(32位)码点。十六进制值0xCD和0xE8,是ASCII的7位字符范围之外的两个特殊的声调字符......