点击查看代码
import re
import requests
from lxml import etree
import asyncio
import aiohttp
import aiofiles
import os
from Crypto.Cipher import AES
# 获取第一层m3u8文件的url并下载
def first_m3u8_url(url):
resp = requests.get(url, verify=False)
html = etree.HTML(resp.text)
resp.close()
script_text = html.xpath('/html/body/div/div[1]/div[1]/div[1]/div[2]/div/script[1]/text()')[0]
obj = re.compile(r'var now="(?P<first_url>.*?)";', re.S)
first_url = obj.search(script_text).group('first_url')
return first_url
def m3u8_download(url, name):
resp = requests.get(url)
with open(name, mode='wb') as f:
f.write(resp.content)
resp.close()
# 获取第二层m3u8文件的url并下载
def second_m3u8_url(url):
with open('first_m3u8.txt', mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip()
second_url = url.rsplit('/', 3)[0] + line
m3u8_download(second_url, 'second_m3u8.txt')
return second_url
# 读取全部ts文件的url,拼接出正确的下载地址,异步下载
async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f'video/{name}', mode='wb') as f:
await f.write(await resp.content.read())
print(f"{name} 完成")
async def aio_download(url):
tasks = []
async with aiohttp.ClientSession() as session:
async with aiofiles.open('second_m3u8.txt', mode='r', encoding='utf-8') as f:
async for line in f:
if str(line).startswith('#'):
continue
line = str(line).strip()
file_name = line.rsplit('/', 1)[1]
ts_url = url.rsplit('/', 5)[0] + line
task = asyncio.create_task(download_ts(ts_url, file_name, session))
tasks.append(task)
await asyncio.wait(tasks)
# 从文件中读取加密格式
def get_key(url):
with open('second_m3u8.txt', mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#EXT-X-KEY'):
line_str = line
break
obj = re.compile(r'URI="(?P<URI>.*?)"', re.S)
uri = obj.search(line_str).group('URI')
key_url = url.rsplit('/', 3)[0] + uri
resp = requests.get(key_url)
return resp.text
# 对所有的ts文件异步解码
async def dec_ts(key, name):
aes = AES.new(key=key.encode('utf-8'), IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f'video/{name}', mode='rb') as f1,\
aiofiles.open(f'video/temp_{name}', mode='wb') as f2:
bs = await f1.read()
await f2.write(aes.decrypt(bs))
print(f'{name}完成')
async def aio_dec(key):
tasks = []
async with aiofiles.open('second_m3u8.txt', mode='r', encoding='utf-8') as f:
async for line in f:
if str(line).startswith('#'):
continue
line = str(line).strip()
file_name = line.rsplit('/', 1)[1]
task = asyncio.create_task(dec_ts(key, file_name))
tasks.append(task)
await asyncio.wait(tasks)
# 将解码后的ts文件拼接,注意:os模块中指令过长无法在pycharm中使用
def merge_ts():
ls = []
with open('second_m3u8.txt', mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip()
file_name = line.rsplit('/', 1)[1]
ls.append(f'video/temp_{file_name}')
ts = "+".join(ls)
os.system('copy /b ' + f'{ts} movie.mp4')
print('ok')
def main(url):
first_url = first_m3u8_url(url)
m3u8_download(first_url, 'first_m3u8.txt')
second_url = second_m3u8_url(first_url)
asyncio.run(aio_download(second_url))
key = get_key(first_url)
asyncio.run(aio_dec(key))
merge_ts()
if __name__ == "__main__":
video_url = "https://www.99meijutt.com/play/97071-0-0.html"
main(video_url)