首页 > 编程语言 >Python使用pyppeteer搭建网页截图api

Python使用pyppeteer搭建网页截图api

时间:2023-01-18 11:15:16浏览次数:42  
标签:args Python pyppeteer S3 api import path await loop

因为跨语言需要,打算把pyppeteer、图片压缩、aws S3封装成一个api来调用。

首先自然是要安装依赖 pip3 install pillow boto3 pyppeteer

运行一次脚本,pyppeteer会自动下载最新的浏览器到本地

实测windows启动Chrome时需要设置headless=False 阅读几篇关于pyppeteer的文章你应该知道我在说什么。

请求用GET/POST均可,GET将参数放进query,POST将参数放进消息体,使用application/x-www-urluncoded

# 使用GET
curl -i "http://127.0.0.1:9871/?token=xry1029&uri=https://www.baidu.com"
# 使用POST
curl -X POST -i "http://127.0.0.1:9871" --data "token=xry1029" --data "uri=https://www.baidu.com"

切记uri需要加http://,不需要的功能就注释掉吧...

#!/usr/bin/python3
# author: 如雨yu
# date: 2023/1/18
# httpd import
import http.server as BaseHTTPServer
import socketserver as SocketServer
import urllib.parse as urlparse
import threading
import re
import argparse
import json
# chrome import
from pyppeteer import launch, connect
import asyncio
import traceback
import time
import sys
# 图片压缩 import
import os
from PIL import Image
from PIL import ImageFile
# aws S3 import
import os
import boto3
import base64

def compress_image(outfile, savefile, kb=1536, quality=75, k=0.75): # tx服务器貌似只要1.5m以内的图
    """
    :param outfile: 要压缩的文件
    :param savefile: 导出文件
    :param kb: 压缩目标,KB
    :param k: 每次调整的压缩比率
    :param quality: 初始压缩比率
    :return: 压缩文件地址,压缩文件大小
    outfile => savefile
    """
    o_size = os.path.getsize(outfile) // 1024 # 函数返回为字节,除1024转为kb(1kb = 1024 bit)
    if o_size <= kb:
        os.rename(outfile,savefile)
        print('[compress] 无需压缩')
        return savefile

    ImageFile.LOAD_TRUNCATED_IMAGES = True # 防止图像被截断而报错

    while o_size > kb:
        im = Image.open(outfile)
        x, y = im.size
        out = im.resize((int(x*k), int(y*k)), Image.Resampling.LANCZOS)
        try:
            out.save(outfile, quality=quality) # quality 质量
        except Exception as e:
            print(e)
            break
        o_size = os.path.getsize(outfile) // 1024
    os.rename(outfile,savefile)
    print('[compress] 压缩完成')
    return savefile


async def launchchrome():
    # 全局化变量
    global WsToken
    #global browser
    browser = await launch(autoClose=False,userDataDir='/home/user/user-data/',args=['--disable-infobars','--disable-gpu'])
    WsToken = browser.wsEndpoint
    print ("[browser] 浏览器启动完成!Ws: "+str(WsToken))
    await browser.disconnect()


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(launchchrome())
loop.close()

async def screenshot(Width, Height, Uri, Output, JSexec, SleepTime, FullPage):
    # 连接浏览器
    browser = await connect(browserWSEndpoint=WsToken) 
    page = await browser.newPage()
    # 设置页面视图大小
    await page.setViewport(viewport={'width':Width,'height':Height})
    # 是否启用JS,enabled设为False,则无渲染效果
    await page.setJavaScriptEnabled(enabled=True)
    # 跳到地址
    page.setDefaultNavigationTimeout(15000)  #渲染时间15s 多了就是出现未知的bug
    # 屏蔽webdriver
    await page.evaluateOnNewDocument('Object.defineProperty(navigator, "webdriver", {get: () => false})')
    await page.goto(Uri)
    # 运行js
    await page.evaluate(JSexec)
    # 等待时间
    time.sleep(SleepTime)
    # 截图
    Saveput = Output
    Output =  Saveput+'-cache.png'
    # 长截图...
    if FullPage == "true":
        await page.screenshot({'path': Output, 'fullPage': True})
    else:
        await page.screenshot({'path': Output})
    # 压缩一下
    compress_image(Output, Saveput)
    # 断开浏览器
    print('[browser] 生成图片完成')
    await page.close() #关掉标签防止内存太多ww
    await browser.disconnect()

async def StopChrome():
    # 连接浏览器
    browser = await connect(browserWSEndpoint=WsToken) 
    # 关掉!
    await browser.close()
    await browser.disconnect()

def upload_files(path_local, path_s3):
    """
    上传(重复上传会覆盖同名文件)
    :param path_local: 本地路径
    :param path_s3: s3路径
    """
    print(f'[aws S3] Start upload files.')
 
    if not upload_single_file(path_local, path_s3):
        raise Exception(f'[aws S3] Upload files failed.')
 
    print(f'[aws S3] Upload files successful.')
 
 
def upload_single_file(src_local_path, dest_s3_path):
    """
    上传单个文件
    用upload_files方法
    :param src_local_path:
    :param dest_s3_path:
    :return:
    """
    try:
        with open(src_local_path, 'rb') as f:
            s3.upload_fileobj(f, BUCKET_NAME, dest_s3_path)
    except Exception as e:
        print(f'[aws S3] Upload data failed. | src: {src_local_path} | dest: {dest_s3_path} | Exception: {e}')
        return False
    #print(f'[aws S3] Uploading file successful. | src: {src_local_path} | dest: {dest_s3_path}')
    return True

# 开始操作s3
BUCKET_NAME = ""  # 存储桶名称

# aws_access_key_id和aws_secret_access_key
# 使用base64加密一下
S3_AKI = b''
S3_SAK = b''
# str类型
CN_S3_AKI = base64.b64decode(S3_AKI).decode('utf-8') 
CN_S3_SAK = base64.b64decode(S3_SAK).decode('utf-8')
CN_S3_AKI = CN_S3_AKI.replace("'", "")
CN_S3_SAK = CN_S3_SAK.replace("'", "")
CN_REGION_NAME = '' #前缀域名
ENDPOINT_URL = '' # endpoint 端点域名 一定看给的那个啊!!!
# 打开实例
s3_session = boto3.Session(region_name=CN_REGION_NAME,
                  aws_access_key_id=CN_S3_AKI,
                  aws_secret_access_key=CN_S3_SAK)
s3 = s3_session.client("s3", endpoint_url=ENDPOINT_URL)
print('[aws S3] 初始化完成')


class apiHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    def do_GET(self):
        path,args=urlparse.splitquery(self.path)
        self._response(path, args)

    def do_POST(self):
        args = self.rfile.read(int(self.headers['content-length'])).decode("utf-8")
        self._response(self.path, args)

    def _response(self, path, args):
        self.send_response(200)
        self.send_header('Content-type','text/plain')
        self.end_headers()
        # 开始call chrome
        # 参数处理
        if args:
                args=urlparse.parse_qs(args).items()
                args=dict([(k,v[0]) for k,v in args])
        else:
                args={}
        
        Uri=args.get("uri","https://www.baidu.com")
        ScreenShotPath=args.get("path","test.jpg")
        Height=int(args.get("h","1080"))
        Token=args.get("token","null")
        Width=int(args.get("w","1920"))
        FullPage=args.get("fullpage","false")
        JSexec=args.get("jsexec", "void(0);")
        SleepTime=int(args.get("sleeptime", "0"))
        err_str = None
        try:
            # 在这里可以指定一个token 没有token不生成截图
            if Token != '':
                print("[httpd] 无token访问...")
                # 抛出无token
                raise Exception('Bad Token')
            else:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                loop.run_until_complete(screenshot(Width, Height, Uri, "<输出图片地址>"+ScreenShotPath, JSexec, SleepTime, FullPage))
                loop.close()
        except Exception as e:
            err_str = '服务器错误: '+str(e) #+"\n"+traceback.format_exc()

        if err_str is not None:
            response_str = err_str
        else:
            # 上传s3
            upload_files('<输出图片地址>'+ScreenShotPath, 'data/'+ScreenShotPath)
            response_str = "https://<s3域名>/data/"+ScreenShotPath
        self.wfile.write(response_str.encode("UTF-8"))

class ThreadedServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
    def __init__(self, *args,**kwargs):
        self.screen_lock = threading.Lock()
        BaseHTTPServer.HTTPServer.__init__(self, *args, **kwargs)

    def safe_print(self,*args,**kwargs):
        try:
            self.screen_lock.acquire()
            print(*args,**kwargs)
        finally:
            self.screen_lock.release()



if __name__=="__main__":
    parser=argparse.ArgumentParser()
    parser.add_argument('-ip','--address',required=False,help='IP address to listen. Default is 127.0.0.1',default='127.0.0.1')
    parser.add_argument('-p','--port',type=int,help='port to bind',default=9087)
    args = parser.parse_args()
    
    server = ThreadedServer((args.address, args.port), apiHandler)
 
    #start the server
    print('[httpd] Server is Ready. %s:%s' % (args.address, args.port))
    
    while True:
        try:
            server.handle_request()
        except KeyboardInterrupt:
            break
        
    server.safe_print("[main] Control-C hit: Exiting server...")
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(StopChrome())
    loop.close()

如果要关掉chrome呢?当然不是手动ctrl+c然后kill

用这个,重新连接chrome浏览器并且关闭

#!/usr/bin/python3
from pyppeteer import connect
import asyncio
global WsToken
f = open("/home/user/user-data/DevToolsActivePort", "r",encoding='utf-8')
Port = f.readline() # 第一行 有\n
Port = Port.replace('\n', '')
Token = f.readline() # 第二行 没有
WsToken = 'ws://127.0.0.1:'+Port+Token
async def StopChrome():
    browser = await connect(browserWSEndpoint=WsToken) 
    await browser.close()
    await browser.disconnect() # 离谱 关浏览器还要断连...

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(StopChrome())
loop.close()

大概1.5s一张图,还是不太稳定..希望有大佬能帮忙提提意见

标签:args,Python,pyppeteer,S3,api,import,path,await,loop
From: https://www.cnblogs.com/tianpanyu/p/17059346.html

相关文章

  • python re 正则表达式
    1.匹配两个数字importreres=re.search(r"[0-9]{2}","sadfsda35C")#正则表达式为[0-9]{2}#匹配的字符串为sadfsda35Cprint(res)  2.匹配两个数字(另一种......
  • python复习功课
    一、类方法(实例方法、类方法、静态方法)使用方式:1.实例方法是必须实例化可访问构建方法中的实例属性,也可通过类名去使用类属性,常用是实例化类给到一个类对象,用类对象.方法......
  • pyenv离线安装Python
    ​​welcometomyblog​​使用pyenv安装python3.7.4,下载了很久没有下载成功,于是尝试离线安装一.手动下载python3.7.4pyenv安装python时会给出下载地址,去这个地址下......
  • Python爬虫-第四章-2-协程与异步
    协程:    单线程执行多任务执行时,当执行中程序处于I/O期间,异步可以让CPU选择性的切换到其他任务上#DemoDescribe:协程importasyncioimporttime'''协程所针对的......
  • python方法(函数)
    定义格式def方法名(参数1,参数2,参数3):#具体实现return#返回值参数默认值defdesc(name='no-name',age=0):print("%s%d"%(name,age))#调用desc......
  • python序列
    类似于Java和C的数组,但python的”数组“可操作性更强,以下是常用APIinsert指定位置插入arr=[0,1,20,3,40,5,60,7,80,9]#下标1位置后加入值,结果[0,1,81,......
  • python pip实用手册
    pip是python的包安装工具,类似于JavaScript的npm和yarn设置国内源国内源清华https://pypi.tuna.tsinghua.edu.cn/simple阿里http://mirrors.aliyun.com/pypi/simpl......
  • day14-常用API
    1.API1.1API概述【理解】什么是API​ API(ApplicationProgrammingInterface):应用程序编程接口java中的API​ 指的就是JDK中提供的各种功能的Java类,这些类......
  • day15-常用API
    1.时间日期类1.1Date类(应用)计算机中时间原点1970年1月1日00:00:00时间换算单位1秒=1000毫秒Date类概述Date代表了一个特定的时间,精确到毫秒Date类构......
  • 【Python】爬虫笔记-开源代理池haipproxy使用
    大规模的数据采集需要用到代理池来突破IP封锁。一般代理池的构建是先爬取网上的免费代理,校验后存到数据库中,再提供给其他程序api。github上有很多现成的代理池能拿来用,在......