处理流程:
-
下载所有.ts结尾文件
- 下载.m3u8文件
- 解析出内部.ts结尾链接
- 利用协程异步请求所有.ts文件并下载保存
import asyncio import os import aiohttp import requestsfrom sc.common.user_agent import get_ua from functools import partial,wraps m3u8_url = 'https://cdn8.tvtvgood.com/202205/05/d1c93b592b15/playlist.m3u8' pre_ts_url = m3u8_url.rsplit('/',1)[0] ts_dir = './tsLib' if not os.path.exists(ts_dir): os.mkdir(ts_dir) def async_retry(func=None, max_times=10, sleep=0.2, default=None): ''' 异步请求重试装饰器 :param func: :param max_times: 默认请求重试10次 :param sleep: 每次请求重试间隔,默认:0.2秒 :param default: 所有请求均失败后,返回的默认值 :return: ''' if func is None: return partial(async_retry, max_times=max_times, sleep=sleep, default=default) @wraps(func) async def wrap_in(*args, **kwargs): for _ in range(max_times): try: return await func(*args, **kwargs) except Exception as e: print(f'retry {_ + 1} times, error: ', e) await asyncio.sleep(sleep) return default return wrap_in async def gen_ua(): loop = asyncio.get_event_loop() ua = await loop.run_in_executor(None, get_ua) return ua @async_retry(max_times=10, sleep=0.1) async def fetch(ts_url): if ts_url.startswith('/'): url = f'{pre_ts_url}{ts_url}' ts_url = ts_url.lstrip('/') else: url = f'{pre_ts_url}/{ts_url}' async with aiohttp.ClientSession() as sess: headers = { 'user-agent': await gen_ua() } async with await sess.get(url=url, headers=headers) as res: context = await res.read() dic_data = { 'ctx': context, 'ts_url': ts_url } return dic_data def download(t): dic_data = t.result() title = dic_data['ts_url'] context = dic_data['ctx'] with open(f'{ts_dir}/{title}', 'wb') as f: f.write(context) print(f'已下载{title}') def get_tasks(): headers = { 'origin': 'https://php.playerla.com', 'referer': 'https://php.playerla.com/', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', } params = { 'token': 'kaPRmnMBXJbKz9ZfM9_Zgg', 'expires': '1702128062', } response = requests.get(m3u8_url, params=params, headers=headers) ts_list = [line for line in response.text.splitlines() if line.endswith('.ts')] tasks = [] for ts_url in ts_list: task = asyncio.ensure_future(fetch(ts_url)) task.add_done_callback(download) tasks.append(task) return tasks if __name__ == '__main__': tasks = get_tasks() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
-
利用FFmpeg对.ts文件进行合并
#! /usr/bin/env python # -*- coding: utf-8 -*- import os import subprocess def merge_ts_files(input_folder, output_file): # 获取输入文件夹中的所有TS文件 ts_files = [f for f in os.listdir(input_folder) if f.endswith('.ts')] # 生成一个包含所有TS文件路径的文本文件 with open('file_list.txt', 'w') as f: for ts_file in ts_files: f.write(f"file '{os.path.join(input_folder, ts_file)}'\n") # 使用ffmpeg执行合并命令 subprocess.call(['ffmpeg', '-f', 'concat', '-safe', '0', '-i', 'file_list.txt', '-c', 'copy', output_file]) # 删除临时文件 os.remove('file_list.txt') if __name__ == '__main__': input_folder = './tsLib' # 替换为实际的输入文件夹路径 output_file = './output.mp4' # 替换为实际的输出文件路径 merge_ts_files(input_folder, output_file)
-
FFmpeg工具
- 官网下载对应系统版本工具:http://ffmpeg.org/download.html#build
- 配置系统环境变量
- 验证:ffmpeg -version