因为跨语言需要,打算把pyppeteer、图片压缩、aws S3封装成一个api来调用。
首先自然是要安装依赖 pip3 install pillow boto3 pyppeteer
运行一次脚本,pyppeteer会自动下载最新的浏览器到本地
实测windows启动Chrome时需要设置headless=False 阅读几篇关于pyppeteer的文章你应该知道我在说什么。
请求用GET/POST均可,GET将参数放进query,POST将参数放进消息体,使用application/x-www-urluncoded
# 使用GET
curl -i "http://127.0.0.1:9871/?token=xry1029&uri=https://www.baidu.com"
# 使用POST
curl -X POST -i "http://127.0.0.1:9871" --data "token=xry1029" --data "uri=https://www.baidu.com"
切记uri需要加http://,不需要的功能就注释掉吧...
#!/usr/bin/python3
# author: 如雨yu
# date: 2023/1/18
# httpd import
import http.server as BaseHTTPServer
import socketserver as SocketServer
import urllib.parse as urlparse
import threading
import re
import argparse
import json
# chrome import
from pyppeteer import launch, connect
import asyncio
import traceback
import time
import sys
# 图片压缩 import
import os
from PIL import Image
from PIL import ImageFile
# aws S3 import
import os
import boto3
import base64
def compress_image(outfile, savefile, kb=1536, quality=75, k=0.75): # tx服务器貌似只要1.5m以内的图
"""
:param outfile: 要压缩的文件
:param savefile: 导出文件
:param kb: 压缩目标,KB
:param k: 每次调整的压缩比率
:param quality: 初始压缩比率
:return: 压缩文件地址,压缩文件大小
outfile => savefile
"""
o_size = os.path.getsize(outfile) // 1024 # 函数返回为字节,除1024转为kb(1kb = 1024 bit)
if o_size <= kb:
os.rename(outfile,savefile)
print('[compress] 无需压缩')
return savefile
ImageFile.LOAD_TRUNCATED_IMAGES = True # 防止图像被截断而报错
while o_size > kb:
im = Image.open(outfile)
x, y = im.size
out = im.resize((int(x*k), int(y*k)), Image.Resampling.LANCZOS)
try:
out.save(outfile, quality=quality) # quality 质量
except Exception as e:
print(e)
break
o_size = os.path.getsize(outfile) // 1024
os.rename(outfile,savefile)
print('[compress] 压缩完成')
return savefile
async def launchchrome():
# 全局化变量
global WsToken
#global browser
browser = await launch(autoClose=False,userDataDir='/home/user/user-data/',args=['--disable-infobars','--disable-gpu'])
WsToken = browser.wsEndpoint
print ("[browser] 浏览器启动完成!Ws: "+str(WsToken))
await browser.disconnect()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(launchchrome())
loop.close()
async def screenshot(Width, Height, Uri, Output, JSexec, SleepTime, FullPage):
# 连接浏览器
browser = await connect(browserWSEndpoint=WsToken)
page = await browser.newPage()
# 设置页面视图大小
await page.setViewport(viewport={'width':Width,'height':Height})
# 是否启用JS,enabled设为False,则无渲染效果
await page.setJavaScriptEnabled(enabled=True)
# 跳到地址
page.setDefaultNavigationTimeout(15000) #渲染时间15s 多了就是出现未知的bug
# 屏蔽webdriver
await page.evaluateOnNewDocument('Object.defineProperty(navigator, "webdriver", {get: () => false})')
await page.goto(Uri)
# 运行js
await page.evaluate(JSexec)
# 等待时间
time.sleep(SleepTime)
# 截图
Saveput = Output
Output = Saveput+'-cache.png'
# 长截图...
if FullPage == "true":
await page.screenshot({'path': Output, 'fullPage': True})
else:
await page.screenshot({'path': Output})
# 压缩一下
compress_image(Output, Saveput)
# 断开浏览器
print('[browser] 生成图片完成')
await page.close() #关掉标签防止内存太多ww
await browser.disconnect()
async def StopChrome():
# 连接浏览器
browser = await connect(browserWSEndpoint=WsToken)
# 关掉!
await browser.close()
await browser.disconnect()
def upload_files(path_local, path_s3):
"""
上传(重复上传会覆盖同名文件)
:param path_local: 本地路径
:param path_s3: s3路径
"""
print(f'[aws S3] Start upload files.')
if not upload_single_file(path_local, path_s3):
raise Exception(f'[aws S3] Upload files failed.')
print(f'[aws S3] Upload files successful.')
def upload_single_file(src_local_path, dest_s3_path):
"""
上传单个文件
用upload_files方法
:param src_local_path:
:param dest_s3_path:
:return:
"""
try:
with open(src_local_path, 'rb') as f:
s3.upload_fileobj(f, BUCKET_NAME, dest_s3_path)
except Exception as e:
print(f'[aws S3] Upload data failed. | src: {src_local_path} | dest: {dest_s3_path} | Exception: {e}')
return False
#print(f'[aws S3] Uploading file successful. | src: {src_local_path} | dest: {dest_s3_path}')
return True
# 开始操作s3
BUCKET_NAME = "" # 存储桶名称
# aws_access_key_id和aws_secret_access_key
# 使用base64加密一下
S3_AKI = b''
S3_SAK = b''
# str类型
CN_S3_AKI = base64.b64decode(S3_AKI).decode('utf-8')
CN_S3_SAK = base64.b64decode(S3_SAK).decode('utf-8')
CN_S3_AKI = CN_S3_AKI.replace("'", "")
CN_S3_SAK = CN_S3_SAK.replace("'", "")
CN_REGION_NAME = '' #前缀域名
ENDPOINT_URL = '' # endpoint 端点域名 一定看给的那个啊!!!
# 打开实例
s3_session = boto3.Session(region_name=CN_REGION_NAME,
aws_access_key_id=CN_S3_AKI,
aws_secret_access_key=CN_S3_SAK)
s3 = s3_session.client("s3", endpoint_url=ENDPOINT_URL)
print('[aws S3] 初始化完成')
class apiHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
path,args=urlparse.splitquery(self.path)
self._response(path, args)
def do_POST(self):
args = self.rfile.read(int(self.headers['content-length'])).decode("utf-8")
self._response(self.path, args)
def _response(self, path, args):
self.send_response(200)
self.send_header('Content-type','text/plain')
self.end_headers()
# 开始call chrome
# 参数处理
if args:
args=urlparse.parse_qs(args).items()
args=dict([(k,v[0]) for k,v in args])
else:
args={}
Uri=args.get("uri","https://www.baidu.com")
ScreenShotPath=args.get("path","test.jpg")
Height=int(args.get("h","1080"))
Token=args.get("token","null")
Width=int(args.get("w","1920"))
FullPage=args.get("fullpage","false")
JSexec=args.get("jsexec", "void(0);")
SleepTime=int(args.get("sleeptime", "0"))
err_str = None
try:
# 在这里可以指定一个token 没有token不生成截图
if Token != '':
print("[httpd] 无token访问...")
# 抛出无token
raise Exception('Bad Token')
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(screenshot(Width, Height, Uri, "<输出图片地址>"+ScreenShotPath, JSexec, SleepTime, FullPage))
loop.close()
except Exception as e:
err_str = '服务器错误: '+str(e) #+"\n"+traceback.format_exc()
if err_str is not None:
response_str = err_str
else:
# 上传s3
upload_files('<输出图片地址>'+ScreenShotPath, 'data/'+ScreenShotPath)
response_str = "https://<s3域名>/data/"+ScreenShotPath
self.wfile.write(response_str.encode("UTF-8"))
class ThreadedServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, *args,**kwargs):
self.screen_lock = threading.Lock()
BaseHTTPServer.HTTPServer.__init__(self, *args, **kwargs)
def safe_print(self,*args,**kwargs):
try:
self.screen_lock.acquire()
print(*args,**kwargs)
finally:
self.screen_lock.release()
if __name__=="__main__":
parser=argparse.ArgumentParser()
parser.add_argument('-ip','--address',required=False,help='IP address to listen. Default is 127.0.0.1',default='127.0.0.1')
parser.add_argument('-p','--port',type=int,help='port to bind',default=9087)
args = parser.parse_args()
server = ThreadedServer((args.address, args.port), apiHandler)
#start the server
print('[httpd] Server is Ready. %s:%s' % (args.address, args.port))
while True:
try:
server.handle_request()
except KeyboardInterrupt:
break
server.safe_print("[main] Control-C hit: Exiting server...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(StopChrome())
loop.close()
如果要关掉chrome呢?当然不是手动ctrl+c然后kill
用这个,重新连接chrome浏览器并且关闭
#!/usr/bin/python3
from pyppeteer import connect
import asyncio
global WsToken
f = open("/home/user/user-data/DevToolsActivePort", "r",encoding='utf-8')
Port = f.readline() # 第一行 有\n
Port = Port.replace('\n', '')
Token = f.readline() # 第二行 没有
WsToken = 'ws://127.0.0.1:'+Port+Token
async def StopChrome():
browser = await connect(browserWSEndpoint=WsToken)
await browser.close()
await browser.disconnect() # 离谱 关浏览器还要断连...
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(StopChrome())
loop.close()
大概1.5s一张图,还是不太稳定..希望有大佬能帮忙提提意见
标签:args,Python,pyppeteer,S3,api,import,path,await,loop From: https://www.cnblogs.com/tianpanyu/p/17059346.html