首页 > 编程语言 >python多线程应用-批量下载视频课程(宠医堂)

python多线程应用-批量下载视频课程(宠医堂)

时间:2023-01-09 15:37:07浏览次数:53  
标签:name course python list detail 宠医堂 url 多线程 dir

import os
import re
import shutil
import time
from collections.abc import Iterable
import concurrent.futures
from Crypto.Cipher import AES
import requests
from bs4 import BeautifulSoup

cookie_dict = {}


def login_and_set_cookie(username='', password=''):
    '''登录与获取cookie'''
    login_url = 'http://www.wsavs.com/login/loginin'
    data = f'mobile={username}&password={password}'
    content_type = 'application/x-www-form-urlencoded; charset=UTF-8'
    headers = {'content-type': content_type}
    login_resp = requests.post(url=login_url, data=data, headers=headers)
    cookie = login_resp.headers['Set-Cookie'] if login_resp.headers.get('Set-Cookie') else {}
    cookie_dict['Cookie'] = cookie


def get_my_course_list():
    '''获取我的课程列表'''
    my_course_list_url = 'http://www.wsavs.com/mycenter/mycourse'
    my_course_list_resp = requests.get(url=my_course_list_url, params=None, headers=cookie_dict)
    my_course_list_html = my_course_list_resp.text
    # my_course_list_html_file = 'my_couse_list.html'
    # with open(my_course_list_html_file, 'w', encoding='utf-8') as f:
    #     f.write(str(my_course_list_html))
    #     解析课程列表
    # with open(my_course_list_html_file, 'r', encoding='utf-8') as file:
    soup = BeautifulSoup(my_course_list_html, 'html.parser')  # 这里没有装lxml的话,把它去掉用默认的就好 lxml
    # 匹配带有class属性的div标签
    my_couse_name_div = soup.find_all('div', attrs={'class': re.compile("flex1")})
    my_course_list = []
    for i in my_couse_name_div:
        course_name = i.find('div', attrs={'class': re.compile("f18 cor3 mb5")})
        course_name = course_name.string if course_name else course_name
        course_url = i.find('a')
        course_url = course_url['href'] if course_url and course_url.get('href') else course_url
        my_course_dict = {}
        if course_name and course_url:
            my_course_dict['course_name'] = course_name
            my_course_dict['course_url'] = course_url
            my_course_list.append(my_course_dict)
    return my_course_list


def get_course_video_url_for_course_detail_url(course_url_detail, course_name):
    '''
    根据课程详情url获取课程视频url地址
    :param course_url_detail:
    :param course_name:
    :return:
    '''
    # 获取课程视频url
    course_detail_video_resp = requests.get(url=course_url_detail, params=None, headers=cookie_dict)
    # with open(f"{course_name}_video.html", 'w', encoding='utf-8') as f2:
    #     f2.write(str(course_detail_video_resp.text))

    # with open(f"{course_name}_video.html", 'r', encoding='utf-8') as f2:
    soup_video = BeautifulSoup(course_detail_video_resp.text, 'html.parser')  # 这里没有装lxml的话,把它去掉用默认的就好 lxml
    course_video_url = soup_video.find('source', attrs={'type': "application/x-mpegURL", "id": "source"})
    course_video_url = course_video_url['src'] if course_video_url and course_video_url.get('src') else course_video_url
    return course_video_url


def get_my_course_detail(course_url, course_name):
    '''
    获取课程详细信息
    :param course_url:
    :return:
    '''
    my_course_detail_resp=requests.get(url=course_url,params=None,headers=cookie_dict)
    # with open(f"{course_name}.html",'w',encoding='utf-8') as f:
    #     f.write(str(my_course_detail_resp.text))

    # with open(f"{course_name}.html", 'r', encoding='utf-8') as f:
    soup = BeautifulSoup(my_course_detail_resp.text, 'html.parser')  # 这里没有装lxml的话,把它去掉用默认的就好 lxml
    course_detail_div = soup.find_all('div', attrs={'class': re.compile("pl20 pr20 pt20 pb40 xiangqing")})
    course_name_dict = {}
    for _course in course_detail_div:
        course_detail_names_div = _course.find_all('div', attrs={'class': re.compile("f16 fb cor3")})
        course_detail_urls_div = _course.find_all('a')
        course_detail_list = []
        for course_name_detail, course_url_detail in zip(course_detail_names_div, course_detail_urls_div):
            course_name_detail = course_name_detail.string if course_name_detail else course_name_detail
            course_url_detail = course_url_detail['href'] if course_url_detail.get('href') else course_url_detail
            # 获取课程视频url
            course_video_url = get_course_video_url_for_course_detail_url(course_url_detail=course_url_detail,
                                                                          course_name=course_name)
            course_detail_dict = {}
            if course_name_detail and course_video_url:
                course_detail_dict['course_name_detail'] = course_name_detail
                course_detail_dict['course_url_detail'] = course_video_url
                course_detail_list.append(course_detail_dict)
        course_name_dict[course_name] = course_detail_list
    return course_name_dict


def get_ts_url(url_course,cyt_course_dir):
    '''

    :param url_course: 课程url
    :param cyt_course_dir: 保存的课程目录
    :return: 课程所有的ts_url,加密对象
    '''
    # 课程信息
    resp_course_info = requests.get(url=url_course, params=None, headers=None).text
    # 获取加密url
    get_key_url = re.search("URI.*\"", resp_course_info)
    get_key_url = get_key_url.group() if get_key_url else None
    IV = re.search("IV.*", resp_course_info)
    # 获取加密key
    IV = IV.group()[3:] if IV else None
    # 获取所有的视频url
    ts_urls = re.findall("v.+ts\?start=.+", resp_course_info)
    ts_url_prefix=url_course[:url_course.rfind("/")+1]
    ts_urls = [ts_url_prefix+i for i in ts_urls]
    decrypt_key = get_key_url[get_key_url.find('"') + 1:get_key_url.rfind('"')]
    # 获取加密key
    resp_key_result = requests.get(url=decrypt_key, params=None, headers=None, cookies=cookie_dict).content
    cryptor = AES.new(resp_key_result, AES.MODE_CBC, iv=IV[:16].encode('utf-8'))
    if not os.path.exists(cyt_course_dir):
        os.makedirs(cyt_course_dir)
    return ts_urls, cryptor


def write_course_to_file(cryptor, ts_url, file_name):
    '''
    :param cryptor: AES加密对象
    :param ts_url: 视频url
    :param file_name: 文件名称
    :return:
    '''
    ts_resp = requests.get(url=ts_url, params=None, headers=None, cookies=cookie_dict).content
    result = cryptor.decrypt(ts_resp)
    with open(f'{file_name}.mpg', 'wb') as f:
        f.write(result)


# 比较两个list的长度,长度的list用None补起
def compare_list_polishing(list1: Iterable, list2: Iterable, polishing_str=None) -> (list, tuple):
    '''polishing_str:补齐的字符'''
    if not (isinstance(list1, Iterable) or isinstance(list2, Iterable)):
        raise Exception("list1/list2必须是可迭代类型")
    l_con = len(list1)
    l_pr = len(list2)
    if l_con != l_pr:
        l_difference = l_con - l_pr
        _list = []
        if l_difference < 0:
            _list.extend(list1)
            for i in range(abs(l_difference)):
                _list.append(polishing_str)
            return _list, list2
        else:
            _list.extend(list2)
            for i in range(abs(l_difference)):
                _list.append(polishing_str)
            return list1, _list
    return list1, list2


def down_course_sync(url_course,cyt_course_dir):
    '''视频下载同步版'''
    start_time = time.time()
    if not os.path.exists(cyt_course_dir):
        os.makedirs(cyt_course_dir)
    else:
        print(f"{cyt_course_dir}课程已下载,无需重复下载")
        return
    ts_urls, cryptor = get_ts_url(url_course=url_course, cyt_course_dir=cyt_course_dir)
    for index,ts_url  in enumerate(ts_urls):
        write_course_to_file(cryptor=cryptor, ts_url=ts_url, file_name=f'{cyt_course_dir}/{index}')
    print('%2.2f second' % (time.time() - start_time))

def mpg_video_merge(cyt_course_dir,out_file_name):
    '''
    合并视频
    :param cyt_course_dir:
    :param out_file_name:
    :return:
    '''
    pwd=os.getcwd()
    if not os.path.exists(os.path.join(os.getcwd(),cyt_course_dir)):
        raise Exception("目录不存在")
    # 切换目录到课程目录
    os.chdir(cyt_course_dir)
    if not (os.path.exists(out_file_name) or os.path.exists(f'../{out_file_name}')) :
        print("文件开始合并")
        dir_files = os.listdir(cyt_course_dir)
        # 对文件进行排序
        dir_files.sort(key=lambda x: x[x.rfind('.') + 1:] == 'mpg' and 'all' not in x and int(x[:x.rfind('.') ]))
        # 合并视频文件
        command_str="copy /B "
        for mpg_file in dir_files:
            if ('mpg' in  mpg_file) and ('all' not in mpg_file):
                command_str+=mpg_file+"+"
        command_str=command_str[:-1] if command_str[-1]=='+' else command_str
        command_str=command_str+f' {out_file_name}'
        res=os.system(command_str)
        print(f'{cyt_course_dir}文件已合并为{out_file_name},执行结果:{res}')
    else:
        print(f'{out_file_name}文件已存在,无需合并')
    if not os.path.exists(f'../{out_file_name}'):
        # 移动合并后的视频文件到上级目录
        shutil.move(out_file_name,f'../{out_file_name}')
        print(f"{out_file_name}文件移动到上级目录")
    else:
        print(f"{out_file_name}文件无需移动到上级目录")
    # 切换目录到原目录
    os.chdir(pwd)


def down_course_batch_thread(url_course, cyt_course_dir,thread_num=20):
    '''视频下载多线程版'''
    start_time = time.time()
    if not os.path.exists(cyt_course_dir):
        os.makedirs(cyt_course_dir)
    else:
        print(f"{cyt_course_dir}课程已下载,无需重复下载")
        return
    ts_urls, cryptor = get_ts_url(url_course=url_course, cyt_course_dir=cyt_course_dir)
    ts_urls, cryptors = compare_list_polishing(ts_urls, [cryptor], polishing_str=cryptor)
    with open(f'{cyt_course_dir}/ts_urls.txt', 'w') as f:
        f.write(str(ts_urls))
    file_names = [f'{cyt_course_dir}/{index}' for index in range(len(ts_urls))]
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as executor:
        for url, data in zip(ts_urls, executor.map(write_course_to_file, cryptors, ts_urls, file_names)):
            # print('%r' % url)
            pass
    print('%s课程下载耗时,%2.2f second' % (cyt_course_dir,time.time() - start_time))


if __name__ == "__main__":
    start_time=time.time()
    login_and_set_cookie(username='18290510975',password='xphtcl..')
    course_list = get_my_course_list()
    for i in course_list:
        course_detail_info=get_my_course_detail(**i)
        for course in course_detail_info:
            for course_detail in course_detail_info[course]:
                down_course_batch_thread(url_course=course_detail["course_url_detail"],cyt_course_dir=f'{course}/{course_detail["course_name_detail"]}')
                mpg_video_merge(cyt_course_dir=f'{course}/{course_detail["course_name_detail"]}',out_file_name=f'{course_detail["course_name_detail"]}_all.mpg')
    print('程序执行总耗时%2.2f second' % (time.time() - start_time))

标签:name,course,python,list,detail,宠医堂,url,多线程,dir
From: https://www.cnblogs.com/qtclm/p/17037185.html

相关文章