首页 > 编程语言 >python 下载m3u8 加密与非加密 收集

python 下载m3u8 加密与非加密 收集

时间:2022-12-28 18:35:46浏览次数:46  
标签:加密 m3u8 python self param url return data

#!/usr/bin/env python
# encoding: utf-8
import requests, os, platform, time
from Crypto.Cipher import AES
import multiprocessing
from retrying import retry

class M3u8:
    '''
     This is a main Class, the file contains all documents.
     One document contains paragraphs that have several sentences
     It loads the original file and converts the original file to new content
     Then the new content will be saved by this class
    '''
    def __init__(self):
        '''
        Initial the custom file by self
        '''
        self.encrypt = False
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0"
        }

    def hello(self):
        '''
        This is a welcome speech
        :return: self
       return self

    def checkUrl(self, url):
        '''
        Determine if it is a available link of m3u8
        :return: bool
        '''
        if '.m3u8' not in url:
            return False
        elif not url.startswith('http'):
            return False
        else:
            return True

    def parse(self, url):
        '''
        Analyze a link of m3u8
        :param url: string, the link need to analyze
        :return: list
        '''
        container = list()
        response = self.request(url).text.split('\n')
        for ts in response:
            if '.ts' in ts:
                container.append(ts)
            if '#EXT-X-KEY:' in ts:
                self.encrypt = True
        return container

    def getEncryptKey(self, url):
        '''
        Access to the secret key
        :param url: string, Access to the secret key by the url
        :return: string
        '''
        encryptKey = self.request("{}/key.key".format(url)).content
        return encryptKey

    def aesDecode(self, data, key):
        '''
        Decode the data
        :param data: stream, the data need to decode
        :param key: secret key
        :return: decode the data
        '''
        crypt = AES.new(key, AES.MODE_CBC, key)
        plain_text = crypt.decrypt(data)
        return plain_text.rstrip(b'\0')

    def download(self, queue, sort, file, downPath, url):
        '''
        Download the debris of video
        :param queue: the queue
        :param sort: which number debris
        :param file: the link of debris
        :param downPath: the path to save debris
        :param url: the link of m3u8
        :return: None
        '''
        queue.put(file)

        baseUrl = '/'.join(url.split("/")[:-1])

        if self.encrypt:
            self.encryptKey = self.getEncryptKey(baseUrl)

        if not file.startswith("http"):
            file = baseUrl + '/' +file

        debrisName = "{}/{}.ts".format(downPath, sort)

        if not os.path.exists(debrisName):
            response = self.request(file)
            with open(debrisName, "wb") as f:
                if self.encrypt:
                    data = self.aesDecode(response.content, self.encryptKey)
                    f.write(data)
                    f.flush()
                else:
                    f.write(response.content)
                    f.flush()

    def progressBar(self, queue, count):
        '''
        Show progress bar
        :param queue: the queue
        :param count: the number count of debris
        :return: None
        '''
        print('---一共{}个碎片...'.format(count))
        offset = 0
        while True:
            offset += 1
            file = queue.get()
            rate = offset * 100 / count
            print("\r%s下载成功,当前进度%0.2f%%, 第%s/%s个" % (file, rate, offset, count))
            if offset >= count:
                break

    @retry(stop_max_attempt_number=3)
    def request(self, url, params):
        '''
        Send a request
        :param url: the url of request
        :param params: the params of request
        :return: the result of request
        '''
        response = requests.get(url, params=params, headers=self.headers, timeout=10)
        assert response.status_code == 200
        return response

    def run(self):
        '''
        program entry, Input basic information
        '''
        downPath = str(input("碎片的保存路径, 默认./Download:")) or "./Download"
        savePath = str(input("视频的保存路径, 默认./Complete:")) or "./Complete"
        clearDebris = bool(input("是否清除碎片, 默认True:")) or True
        saveSuffix = str(input("视频格式, 默认ts:")) or "ts"

        while True:
            url = str(input("请输入合法的m3u8链接:"))
            if self.checkUrl(url):
                break

        # create a not available folder
        if not os.path.exists(downPath):
            os.mkdir(downPath)

        if not os.path.exists(savePath):
            os.mkdir(savePath)

        # start analyze a link of m3u8
        print('---正在分析链接...')
        container = self.parse(url)
        print('---链接分析成功...')

        # run processing to do something
        print('---进程开始运行...')
        po = multiprocessing.Pool(30)
        queue = multiprocessing.Manager().Queue()
        size = 0
        for file in container:
            sort = str(size).zfill(5)
            po.apply_async(self.download, args=(queue, sort, file, downPath, url,))
            size += 1

        po.close()
        self.progressBar(queue, len(container))
        print('---进程运行结束...')

        # handler debris
        sys = platform.system()
        saveName = time.strftime("%Y%m%d_%H%M%S", time.localtime())

        print('---文件合并清除...')
        if sys == "Windows":
            os.system("copy /b {}/*.ts {}/{}.{}".format(downPath, savePath, saveName, saveSuffix))
            if clearDebris:
                os.system("rmdir /s/q {}".format(downPath))
        else:
            os.system("cat {}/*.ts>{}/{}.{}".format(downPath, savePath, saveName, saveSuffix))
            if clearDebris:
                os.system("rm -rf {}".format(downPath))
        print('---合并清除完成...')
        print('---任务下载完成...')
        print('---欢迎再次使用...')

if __name__ == "__main__":
    M3u8().hello().run()

 

import requests
from lxml import etree
import m3u8
from Crypto.Cipher import AES


headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT"
                  "ML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
}
base_url = 'http://www.tcmmooc.com'
cookies = None


def handle_login():
    """
    登陆获取用户cookies
    """
    res_1 = requests.get("http://www.tcmmooc.com/login", headers=headers)
    cookie_1 = res_1.cookies.get_dict()
    html = etree.HTML(res_1.text)
    token = html.xpath('//*[@id="login-form"]/input[2]/@value')
    res_2 = requests.post("http://www.tcmmooc.com/login_check", headers=headers,
                          data={
                              "_username": "************",
                              "_password": "*************",
                              "_remember_me": "on",
                              "_target_path": "http://www.tcmmooc.com/",
                              "_csrf_token": token
                          },
                          cookies=cookie_1)
    cookie_2 = res_2.cookies.get_dict()
    cookie_dict = {}
    cookie_dict.update(cookie_1)
    cookie_dict.update(cookie_2)
    global cookies
    cookies = cookie_dict


def get_html(url):
    """
    获取url的请求结果并返回封装的html
    :param url: 请求的url
    :return: html
    """
    res = requests.get(url, headers=headers, cookies=cookies)
    html = etree.HTML(res.text)
    return html


def handle_start_m3u8_url(url):
    """
    处理m3u8视频url的请求的url
    :param url: m3u8视频的url来源ajax的url
    :return: 返回m3u8视频的最后一个链接url
    """
    m3u8_content = requests.get(url, headers=headers, cookies=cookies).text
    lines_list = m3u8_content.strip().split('\r\n')
    if len(lines_list) < 3:
        lines_list = m3u8_content.strip().split('\n')
    if '#EXTM3U' not in m3u8_content:
        raise BaseException('非M3U8连接')
    return lines_list[-1]


def handle_m3u8_data(m3u8_url):
    """
    下载m3u8视频
    :param m3u8_url: 最后m3u8视频的url
    """
    m3u8_obj = m3u8.load(m3u8_url)  # 导入url,返回m3u8结果的对象
    a = 0
    key = requests.get(m3u8_obj.keys[0].uri, headers=headers, cookies=cookies).content  # 获取aes加密的url结果
    for i in m3u8_obj.keys:  # 循环密匙
        for seg in m3u8_obj.segments.by_key(i):  # 循环密匙对应的m3u8视频
            res = requests.get(seg.uri, headers=headers, cookies=cookies)  # 获取m3u8视频片段的url返回的加密视频结果
            iv = bytes.fromhex(seg.key.iv[2:])  # 提取aes加密的iv值,每个视频片段的iv值不同
            content_video_part = AES.new(key, AES.MODE_CBC, iv).decrypt(res.content)  # 对视频结果进行解密
            with open("all\\text.MP4", 'ab') as f:  # 追加保存解密结果
                f.write(content_video_part)
                print(a)
                a += 1


def get_video_url():
    """
    获取m3u8视频连接的url,然后处理并下载视频
    """
    html = get_html('http://www.tcmmooc.com/course/6342')
    video_url = base_url + html.xpath('//ul[@id="course-item-list"]/li[1]/a/@data-url')[0]
    html = get_html(video_url)
    data_player_url = base_url + html.xpath('//div[@id="lesson-preview-player"]/@data-player-url')[0]
    html = get_html(data_player_url)
    data_url = html.xpath('//div[@id="lesson-video-content"]/@data-url')[0]
    fina_m3u8_url = handle_start_m3u8_url(data_url)
    handle_m3u8_data(fina_m3u8_url)


if __name__ == "__main__":
    handle_login()  # 登陆
    get_video_url()  # 下载

 

标签:加密,m3u8,python,self,param,url,return,data
From: https://www.cnblogs.com/xkdn/p/17010968.html

相关文章

  • Python 爬取微博指定博主所有内容
    这么做有啥用呢,一方面是为了防止他的微博删除,另一方面怕被系统和谐,所以就把他的微博内容爬取下来,然后保存到word文档中,以备不时查看...那么接下来进入爬虫分析环节,小编这里......
  • 使用Python无水印下载抖音图文
    从刚上线时只是一款音乐创意短视频社交软件,到如今涵盖了短视频、直播、社交、购物、本地生活服务,抖音正在急速扩张。抖音的野心还不止于此。今年,抖音上线了新的内容——图文......
  • python 实现抖音通过关键字搜索下载短视频
    在日常生活中,随着短视频的发展,大家使用抖音进行数据搜索,也占了一大部分,今天给大家带来的文章抖音根据关键词进行视频下载有什么作用呢?其实很多时候我们制作视频,写脚本,都需要......
  • Python抖音直播录屏 || 下载,支持多主播同时录制
    抖音作为当今最火的自媒体之一,是自媒体人素材的来源之一,自从电商直播火爆以后,抖音似乎也迎来了业务的新高度,反正直播业务开展如火如荼,那么我们进行直播录制有什么作用呢?在热......
  • Python爬取抖音创作者所有短视频
    小伙伴们,大家好呀,上次给大家分享了​​如何爬取快手up主所有的短视频​​后,不少人在后台留言说,想要爬取抖音up主的所有短视频,那么今天代码就来了。其实有了​​爬取快手up主......
  • MD5安全吗,MD5加密有哪些问题,如何提高安全性?
    MD5是一种散列函数,在计算机安全领域得到广泛应用。然而,MD5国际密码算法被王小云研究团队证实并不安全,因为MD5本身存在一些缺点,这些缺点导致了MD5并不是很安全,可能会带来信息......
  • Python-open函数-读写文件
    一、open函数语法open()函数的作用是打开一个文件,并返回一个file对象(即文件对象)。open是一个动作,可以理解为我们打开文档的点击动作。file对象是一个实物,可以理......
  • python爬取百度图库多张图片
    hello啊,各位小伙伴,眨眼间7月份过去了,八月已经悄然来临,不知道八月大家又立了什么样的flag,作为一个低产的公众号运营者,想想都是一阵莫名奇妙的辛酸,每月就三四篇文章,唉…不多......
  • Python 通过关键词下载百度图片
    打开百度图片后,输入相关关键词,根据分析,发现百度图片初始只会渲染部分图像到页面上,随着滚轮下滚,就会请求新的数据,因此我们可以判断页面是经过ajax请求数据后,渲染至页面。百度......
  • Python 解析西瓜视频 | 无水印高清下载
    很多小伙伴都有使用西瓜视频,现在西瓜视频作为字节旗下的全家桶一员,拥有很多优质的视频,是很多自媒体小伙伴需要的。但是如何无水印下载这些西瓜视频呢?却让很多人为难,缺少这个......