#!/usr/bin/env python # encoding: utf-8 import requests, os, platform, time from Crypto.Cipher import AES import multiprocessing from retrying import retry class M3u8: ''' This is a main Class, the file contains all documents. One document contains paragraphs that have several sentences It loads the original file and converts the original file to new content Then the new content will be saved by this class ''' def __init__(self): ''' Initial the custom file by self ''' self.encrypt = False self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0" } def hello(self): ''' This is a welcome speech :return: self return self def checkUrl(self, url): ''' Determine if it is a available link of m3u8 :return: bool ''' if '.m3u8' not in url: return False elif not url.startswith('http'): return False else: return True def parse(self, url): ''' Analyze a link of m3u8 :param url: string, the link need to analyze :return: list ''' container = list() response = self.request(url).text.split('\n') for ts in response: if '.ts' in ts: container.append(ts) if '#EXT-X-KEY:' in ts: self.encrypt = True return container def getEncryptKey(self, url): ''' Access to the secret key :param url: string, Access to the secret key by the url :return: string ''' encryptKey = self.request("{}/key.key".format(url)).content return encryptKey def aesDecode(self, data, key): ''' Decode the data :param data: stream, the data need to decode :param key: secret key :return: decode the data ''' crypt = AES.new(key, AES.MODE_CBC, key) plain_text = crypt.decrypt(data) return plain_text.rstrip(b'\0') def download(self, queue, sort, file, downPath, url): ''' Download the debris of video :param queue: the queue :param sort: which number debris :param file: the link of debris :param downPath: the path to save debris :param url: the link of m3u8 :return: None ''' queue.put(file) baseUrl = '/'.join(url.split("/")[:-1]) if self.encrypt: self.encryptKey = self.getEncryptKey(baseUrl) if not file.startswith("http"): file = baseUrl + '/' +file debrisName = "{}/{}.ts".format(downPath, sort) if not os.path.exists(debrisName): response = self.request(file) with open(debrisName, "wb") as f: if self.encrypt: data = self.aesDecode(response.content, self.encryptKey) f.write(data) f.flush() else: f.write(response.content) f.flush() def progressBar(self, queue, count): ''' Show progress bar :param queue: the queue :param count: the number count of debris :return: None ''' print('---一共{}个碎片...'.format(count)) offset = 0 while True: offset += 1 file = queue.get() rate = offset * 100 / count print("\r%s下载成功,当前进度%0.2f%%, 第%s/%s个" % (file, rate, offset, count)) if offset >= count: break @retry(stop_max_attempt_number=3) def request(self, url, params): ''' Send a request :param url: the url of request :param params: the params of request :return: the result of request ''' response = requests.get(url, params=params, headers=self.headers, timeout=10) assert response.status_code == 200 return response def run(self): ''' program entry, Input basic information ''' downPath = str(input("碎片的保存路径, 默认./Download:")) or "./Download" savePath = str(input("视频的保存路径, 默认./Complete:")) or "./Complete" clearDebris = bool(input("是否清除碎片, 默认True:")) or True saveSuffix = str(input("视频格式, 默认ts:")) or "ts" while True: url = str(input("请输入合法的m3u8链接:")) if self.checkUrl(url): break # create a not available folder if not os.path.exists(downPath): os.mkdir(downPath) if not os.path.exists(savePath): os.mkdir(savePath) # start analyze a link of m3u8 print('---正在分析链接...') container = self.parse(url) print('---链接分析成功...') # run processing to do something print('---进程开始运行...') po = multiprocessing.Pool(30) queue = multiprocessing.Manager().Queue() size = 0 for file in container: sort = str(size).zfill(5) po.apply_async(self.download, args=(queue, sort, file, downPath, url,)) size += 1 po.close() self.progressBar(queue, len(container)) print('---进程运行结束...') # handler debris sys = platform.system() saveName = time.strftime("%Y%m%d_%H%M%S", time.localtime()) print('---文件合并清除...') if sys == "Windows": os.system("copy /b {}/*.ts {}/{}.{}".format(downPath, savePath, saveName, saveSuffix)) if clearDebris: os.system("rmdir /s/q {}".format(downPath)) else: os.system("cat {}/*.ts>{}/{}.{}".format(downPath, savePath, saveName, saveSuffix)) if clearDebris: os.system("rm -rf {}".format(downPath)) print('---合并清除完成...') print('---任务下载完成...') print('---欢迎再次使用...') if __name__ == "__main__": M3u8().hello().run()
import requests from lxml import etree import m3u8 from Crypto.Cipher import AES headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT" "ML, like Gecko) Chrome/75.0.3770.100 Safari/537.36" } base_url = 'http://www.tcmmooc.com' cookies = None def handle_login(): """ 登陆获取用户cookies """ res_1 = requests.get("http://www.tcmmooc.com/login", headers=headers) cookie_1 = res_1.cookies.get_dict() html = etree.HTML(res_1.text) token = html.xpath('//*[@id="login-form"]/input[2]/@value') res_2 = requests.post("http://www.tcmmooc.com/login_check", headers=headers, data={ "_username": "************", "_password": "*************", "_remember_me": "on", "_target_path": "http://www.tcmmooc.com/", "_csrf_token": token }, cookies=cookie_1) cookie_2 = res_2.cookies.get_dict() cookie_dict = {} cookie_dict.update(cookie_1) cookie_dict.update(cookie_2) global cookies cookies = cookie_dict def get_html(url): """ 获取url的请求结果并返回封装的html :param url: 请求的url :return: html """ res = requests.get(url, headers=headers, cookies=cookies) html = etree.HTML(res.text) return html def handle_start_m3u8_url(url): """ 处理m3u8视频url的请求的url :param url: m3u8视频的url来源ajax的url :return: 返回m3u8视频的最后一个链接url """ m3u8_content = requests.get(url, headers=headers, cookies=cookies).text lines_list = m3u8_content.strip().split('\r\n') if len(lines_list) < 3: lines_list = m3u8_content.strip().split('\n') if '#EXTM3U' not in m3u8_content: raise BaseException('非M3U8连接') return lines_list[-1] def handle_m3u8_data(m3u8_url): """ 下载m3u8视频 :param m3u8_url: 最后m3u8视频的url """ m3u8_obj = m3u8.load(m3u8_url) # 导入url,返回m3u8结果的对象 a = 0 key = requests.get(m3u8_obj.keys[0].uri, headers=headers, cookies=cookies).content # 获取aes加密的url结果 for i in m3u8_obj.keys: # 循环密匙 for seg in m3u8_obj.segments.by_key(i): # 循环密匙对应的m3u8视频 res = requests.get(seg.uri, headers=headers, cookies=cookies) # 获取m3u8视频片段的url返回的加密视频结果 iv = bytes.fromhex(seg.key.iv[2:]) # 提取aes加密的iv值,每个视频片段的iv值不同 content_video_part = AES.new(key, AES.MODE_CBC, iv).decrypt(res.content) # 对视频结果进行解密 with open("all\\text.MP4", 'ab') as f: # 追加保存解密结果 f.write(content_video_part) print(a) a += 1 def get_video_url(): """ 获取m3u8视频连接的url,然后处理并下载视频 """ html = get_html('http://www.tcmmooc.com/course/6342') video_url = base_url + html.xpath('//ul[@id="course-item-list"]/li[1]/a/@data-url')[0] html = get_html(video_url) data_player_url = base_url + html.xpath('//div[@id="lesson-preview-player"]/@data-player-url')[0] html = get_html(data_player_url) data_url = html.xpath('//div[@id="lesson-video-content"]/@data-url')[0] fina_m3u8_url = handle_start_m3u8_url(data_url) handle_m3u8_data(fina_m3u8_url) if __name__ == "__main__": handle_login() # 登陆 get_video_url() # 下载
标签:加密,m3u8,python,self,param,url,return,data From: https://www.cnblogs.com/xkdn/p/17010968.html