首页 > 其他分享 >网站视频如何下载,如何在最短的时间内下载m3u8文件中的ts文件,并将其合并成MP4格式的文件

网站视频如何下载,如何在最短的时间内下载m3u8文件中的ts文件,并将其合并成MP4格式的文件

时间:2024-12-02 21:54:15浏览次数:4  
标签:文件 m3u8 ts file print path os 下载

在现实生活中,我是一个二次元爱好者,每当番剧更新时我总会第一时间去观看,但无论是哪个平台,加载视频总是很缓慢,如果你的网络并不是特别好,有时甚至看几秒就卡顿,非常影响观影体验。但有些网站并不支持离线下载,这时候你想要下载视频应该怎么办呢?下面推荐一个解析和下载视频的插件——万能视频下载神器:

使用方法非常简单,只需打开自己想要下载的视频网页它就会帮你找到对应的m3u8文件然后点击解析:

划到下面就可以选择下载的方式了,可以选择自己想要的文件格式以及文件存放路径:

img点击并拖拽以移动编辑

该插件能够检测识别绝大多数视频网页,但它有个致命缺陷,它的下载速度很缓慢,因此为了加快下载的速度,我写了一个线程池和协程函数搭配使用的python脚本,那下载速度是相当哇塞!,并且执行完脚本就是一个通用的MP4文件,而非一堆.ts文件。首先你需要将那个m3u8文件下载下来或者将解析出来的内容复制到一个新建文本文件中,待会儿执行python脚本时需要从中读取数据。

该脚本分为7个函数,下面是每个函数的详细讲解:

def get_ts_urls(m3u8_path):
    with open(m3u8_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        ts_urls = []
        length = len(lines[0])
        for line in lines:
            if line.startswith('h') and len(line) == length:
                ts_urls.append(line.strip('\n'))
        return ts_urls

点击并拖拽以移动

该代码负责获取每个ts文件的下载地址,也就是之前下载的能够m3u8文件(自己复制的那个txt文件也是可以的)但有些网页解析出来的路径可能不全,因此有时会需要自行拼接文件下载地址路径,传入该文件的存放路径,就能读取里面的数据,返回ts文件下载地址的列表。代码中的length是为了过滤那些广告的ts文件下载路径而存在的,这样就不会下载广告了。

def merge_ts_to_mp4(ts_dir, output_file):
    ts_files = sorted([f for f in os.listdir(ts_dir) if f.endswith('.ts')])
    if not ts_files:
        print("指定目录下没有找到 TS 文件。")
        return

    file_list = os.path.join(ts_dir, 'filelist.txt')
    with open(file_list, 'w', encoding='utf-8') as f:
        for ts_file in ts_files:
            full_path = os.path.join(ts_dir, ts_file).replace(os.sep, '/')
            f.write(f"file '{full_path}'\n")

    command = [
        'ffmpeg',
        '-f', 'concat',
        '-safe', '0',
        '-i', file_list.replace(os.sep, '/'),
        '-c', 'copy',
        '-bsf:a', 'aac_adtstoasc',
        output_file
    ]

    try:
        subprocess.run(command, check=True)
        print(f"成功将 TS 文件合并为 {output_file}")
    except subprocess.CalledProcessError as e:
        print(f"合并 TS 文件时出错: {e}")
    finally:
        os.remove(file_list)
        files = [f for f in os.listdir(ts_dir) if f.endswith('.ts')]
        for file in files:
            file_path = os.path.join(ts_dir, file)
            try:
                os.remove(file_path)
            except OSError as e:
                print(f"删除文件 {file_path} 时出错: {e}")

点击并拖拽以移动

该函数是负责合并下载的ts文件的,当成功合并时会删除下载的ts文件,无需再手动删除。如果成功合并会打印文件的存放路径,失败则会打印提示信息。

def count_ts_files(directory):
    """
    统计指定目录下的 .ts 文件数量。

    :param directory: 需要统计的目录路径
    :return: .ts 文件的数量
    """
    try:
        # 获取目录下的所有文件和子目录
        entries = os.listdir(directory)
        # 过滤出 .ts 文件
        ts_files = [entry for entry in entries if entry.endswith('.ts')]
        return len(ts_files)
    except (FileNotFoundError, PermissionError) as e:
        print(f"无法访问指定的目录 {directory}: {e}")
        return 0

点击并拖拽以移动

该函数的功能是统计下载的ts文件个数,我们只需比较下载的ts文件个数和ts文件下载路径的列表长度是否相等,即可知道下载任务是否完成,以便进行合并操作。

async def download_ts(session, url, save_dir, max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    filename = os.path.join(save_dir, os.path.basename(url))
                    async with aiofiles.open(filename, 'wb') as f:
                        await f.write(await response.read())
                    # print(f"下载成功: {url}")
                    return
                else:
                    print(f"下载失败: {url}, 状态码为: {response.status}")
        except Exception as e:
            if attempt < max_retries:
                print(f"下载 {url} 失败,尝试第 {attempt + 1} 次重试: {e}")
            else:
                print(f"下载 {url} 失败,达到最大重试次数: {e}")
                raise

点击并拖拽以移动

该函数是下载单个ts文件的异步函数,这个函数有重试机制,如果某个ts文件下载异常,它会重新再次进行下载,直到下载成功为止,最大重试次数默认是3,可以根据喜好自行设置。

async def download_all_ts(ts_urls, save_dir):
    async with aiohttp.ClientSession() as session:
        tasks = [download_ts(session, url, save_dir) for url in ts_urls]
        await asyncio.gather(*tasks)

点击并拖拽以移动

这个 download_all_ts 函数是一个异步函数,用于下载一组 TS (Transport Stream) 文件。它利用了 aiohttp 库来处理异步 HTTP 请求,并且可以同时处理多个下载任务。

def download_ts_in_thread(ts_urls, save_dir):
    asyncio.run(download_all_ts(ts_urls, save_dir))

点击并拖拽以移动

该函数是为了配合线程池的使用在线程中运行异步函数。

def main(base_path, max_workers=10):
    m3u8_path = os.path.join(base_path, 'index.m3u8')
    if not os.path.exists(m3u8_path):
        print('文件不存在!')
        return
    ts_urls = get_ts_urls(m3u8_path)
    print(f"下载的ts文件总数为: {len(ts_urls)}")

    # 使用tqdm为下载线程任务添加进度条
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(download_ts_in_thread, ts_urls[i::max_workers], base_path) for i in
                   range(max_workers)]
        total_tasks = len(futures)
        with tqdm(total=total_tasks, desc="下载进度", unit="任务") as pbar:
            for future in concurrent.futures.as_completed(futures):
                future.result()
                pbar.update(1)

    file_count = count_ts_files(base_path)
    if file_count == len(ts_urls):
        output_file = os.path.join(base_path, 'output.mp4')
        merge_ts_to_mp4(base_path, output_file)
    else:
        print('ts文件下载不完整!')
        files = [f for f in os.listdir(base_path) if f.endswith('.ts')]
        for file in files:
            file_path = os.path.join(base_path, file)
            try:
                os.remove(file_path)
            except OSError as e:
                print(f"删除文件 {file_path} 时出错: {e}")

点击并拖拽以移动

该函数负责整个python的运行逻辑,线程池就是在该函数中创建的,下列是完整代码:

import os
import time
import asyncio
import aiohttp
import aiofiles
import concurrent.futures
import subprocess
from tqdm import tqdm  # 导入tqdm库用于显示进度条


def get_ts_urls(m3u8_path):
    with open(m3u8_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        ts_urls = []
        length = len(lines[0])
        for line in lines:
            if line.startswith('h') and len(line) == length:
                ts_urls.append(line.strip('\n'))
        return ts_urls


def merge_ts_to_mp4(ts_dir, output_file):
    ts_files = sorted([f for f in os.listdir(ts_dir) if f.endswith('.ts')])
    if not ts_files:
        print("指定目录下没有找到 TS 文件。")
        return

    file_list = os.path.join(ts_dir, 'filelist.txt')
    with open(file_list, 'w', encoding='utf-8') as f:
        for ts_file in ts_files:
            full_path = os.path.join(ts_dir, ts_file).replace(os.sep, '/')
            f.write(f"file '{full_path}'\n")

    command = [
        'ffmpeg',
        '-f', 'concat',
        '-safe', '0',
        '-i', file_list.replace(os.sep, '/'),
        '-c', 'copy',
        '-bsf:a', 'aac_adtstoasc',
        output_file
    ]

    try:
        subprocess.run(command, check=True)
        print(f"成功将 TS 文件合并为 {output_file}")
    except subprocess.CalledProcessError as e:
        print(f"合并 TS 文件时出错: {e}")
    finally:
        os.remove(file_list)
        files = [f for f in os.listdir(ts_dir) if f.endswith('.ts')]
        for file in files:
            file_path = os.path.join(ts_dir, file)
            try:
                os.remove(file_path)
            except OSError as e:
                print(f"删除文件 {file_path} 时出错: {e}")



def count_ts_files(directory):
    """
    统计指定目录下的 .ts 文件数量。

    :param directory: 需要统计的目录路径
    :return: .ts 文件的数量
    """
    try:
        # 获取目录下的所有文件和子目录
        entries = os.listdir(directory)
        # 过滤出 .ts 文件
        ts_files = [entry for entry in entries if entry.endswith('.ts')]
        return len(ts_files)
    except (FileNotFoundError, PermissionError) as e:
        print(f"无法访问指定的目录 {directory}: {e}")
        return 0


async def download_ts(session, url, save_dir, max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    filename = os.path.join(save_dir, os.path.basename(url))
                    async with aiofiles.open(filename, 'wb') as f:
                        await f.write(await response.read())
                    # print(f"下载成功: {url}")
                    return
                else:
                    print(f"下载失败: {url}, 状态码为: {response.status}")
        except Exception as e:
            if attempt < max_retries:
                print(f"下载 {url} 失败,尝试第 {attempt + 1} 次重试: {e}")
            else:
                print(f"下载 {url} 失败,达到最大重试次数: {e}")
                raise


async def download_all_ts(ts_urls, save_dir):
    async with aiohttp.ClientSession() as session:
        tasks = [download_ts(session, url, save_dir) for url in ts_urls]
        await asyncio.gather(*tasks)


def download_ts_in_thread(ts_urls, save_dir):
    asyncio.run(download_all_ts(ts_urls, save_dir))


def main(base_path, max_workers=10):
    m3u8_path = os.path.join(base_path, 'index.m3u8')
    if not os.path.exists(m3u8_path):
        print('文件不存在!')
        return
    ts_urls = get_ts_urls(m3u8_path)
    print(f"下载的ts文件总数为: {len(ts_urls)}")

    # 使用tqdm为下载线程任务添加进度条
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(download_ts_in_thread, ts_urls[i::max_workers], base_path) for i in
                   range(max_workers)]
        total_tasks = len(futures)
        with tqdm(total=total_tasks, desc="下载进度", unit="任务") as pbar:
            for future in concurrent.futures.as_completed(futures):
                future.result()
                pbar.update(1)

    file_count = count_ts_files(base_path)
    if file_count == len(ts_urls):
        output_file = os.path.join(base_path, 'output.mp4')
        merge_ts_to_mp4(base_path, output_file)
    else:
        print('ts文件下载不完整!')
        files = [f for f in os.listdir(base_path) if f.endswith('.ts')]
        for file in files:
            file_path = os.path.join(base_path, file)
            try:
                os.remove(file_path)
            except OSError as e:
                print(f"删除文件 {file_path} 时出错: {e}")


if __name__ == "__main__":
    # m3u8_url = input('请输入index.m3u8文件存放地址:')
    m3u8_url=r'D:\Desktop\爬虫代码\ts文件下载以及合并\data'
    max_workers = 5
    start_time = time.time()
    main(m3u8_url, max_workers)
    all_time = time.time() - start_time
    print(f"总耗时:{all_time}")

点击并拖拽以移动

以上便是全部内容,希望对各位有所帮助!

标签:文件,m3u8,ts,file,print,path,os,下载
From: https://www.cnblogs.com/xiaohu1/p/18582804

相关文章

  • 【Azure ADLS】为Azure Data Lake Storage的Container赋予了操作权限后创建子文件夹遇
    问题描述在ADF操作StorageAccount(AzureDataLakeStorage),在已经为根Container赋予了权限后,创建子文件夹的时候还是报错403"Thisrequestisnotauthorizedtoperformthisoperationusingthispermission"403  问题解答这是因为ADLSContainer的ACL权限有两......
  • 记spring boot中基于Logback的常用日志文件配置
    1.默认情况下直接依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>2.创建一个名为:logback-spring.xml的文件3.在yml配置文件中激活注:优先级logback.xml(默认)> logback-sp......
  • NFS网络文件系统
    目录1.NFS介绍2.NFS的主要特点3.NFS的工作原理4.NFS服务端安装配置5.NFS客户端挂载使用6.NFS挂载参数7.NFS挂载实例8.NFS总结本篇文章给大家介绍NFS相关知识,NFS是网络文件系统的缩写,主要功能是通过网络让不同主机系统之间可以共享文件和目录。NFS系统和Windows网络共......
  • Keil下载芯片包时找不到根目录的办法
    最近因为课题的原因,需要下载新版的KEIL(旧版2017版的烧录新版本的程序时有问题),下载好之后发现无法找到根目录:错误摘要是:Refresh Pack descriptionE: the specified CMsls Pack Root directorydoes NoT exist! Please take a moment to review ifthe valu......
  • 5-文件上传漏洞
    1、文件上传漏洞原理1.1一句话木马<?php@eval($_POST['xu']);?>其中@表示忽略错误,eval()函数表示把传进去的字符串作为php代码执行从httppost里面拿到参数叫xu的value,然后作为代码去执行,并忽略错误2、Webshell介绍一句话木马、大马、小马、图片马都是webshell中的一种......
  • 【Nginx学习】5大绝招揭秘:Nginx进程间通信机制之互斥锁——文件锁实现的ngx_shmtx_t锁
    ......
  • 文件
    文件复位将文件指针复位的2种方法第一种rewind(fp);第二种fseek(fp,0L,SEEK_SET)起始点表示符号数字表示文件首SEEK—SET0当前位置SEEK—CUR1文件末尾SEEK—END2例如:fseek(fp,100L,0);其意义是把位置指针移到离文件首100个字节处。fseek函数一般用于二进制文......
  • Windows系统文件gamewidget.dll:游戏与性能的双重保障
    在Windows系统中,gamewidget.dll是一个至关重要的动态链接库(DynamicLinkLibrary,简称DLL)文件。它不仅是系统正常运行所必需的组件,更是游戏性能和稳定性的重要保障。本文将深入探讨gamewidget.dll文件的作用、重要性以及如何确保其正常运行,以维护游戏与系统的双重性能。一、gam......
  • 优化Hudi索引文件的性能的方法
    Hudi索引文件是Hudi数据湖框架中的一个关键组件。它主要用于记录数据记录(通常通过主键来标识)与存储位置之间的映射关系。就像图书馆的索引系统一样,能够帮助快速定位到具体数据存储的位置,从而实现高效的数据更新、插入和删除操作。在大数据环境中,没有高效的索引,数据操作......
  • ElementUI 下载文件前后端代码
    前端代码store中的js文件import{excelExportTemplate}from'@/api/xxxxx'asyncexcelExportTemplate({commit},fieldConfig){varres=awaitnewPromise((resolve,reject)=>{excelExportTemplate(fieldConfig).then(response=>{......