import concurrent
import os
import re
import time
from collections.abc import Iterable
from Crypto.Cipher import AES
from tool.request_main import requestMain
from tool.data_util import dataUtil
request=requestMain()
lagouApi_host='https://gate.lagou.com'
cookie='自己账号的cookie信息'
headers={"cookie":cookie,
"x-l-req-header":str({"deviceType":"1"}) }
class getLagouCourseData(object):
def __init__(self):
self.file_name_list = []
self.path_audio=""
self.path=""
self.ts_url_list=[]
self.decrypt_key_list=[]
self.key_url_list=[]
self.ce_name=None
self.dict_ts_url_key={}
self.file_audioMedia_url_list=[]
self.data = dataUtil()
self.courseDetailDict={}
# 比较两个list的长度,长度的list用None补起
def compare_list_polishing(self, list1: Iterable, list2: Iterable,polishing_str=None) -> (list, tuple):
'''polishing_str:补齐的字符'''
if not (isinstance(list1, Iterable) or isinstance(list2, Iterable)):
raise Exception("list1/list2必须是可迭代类型")
l_con = len(list1)
l_pr = len(list2)
if l_con != l_pr:
l_difference = l_con - l_pr
_list=[]
if l_difference < 0:
_list.extend(list1)
for i in range(abs(l_difference)):
_list.append(polishing_str)
return _list,list2
else:
_list.extend(list2)
for i in range(abs(l_difference)):
_list.append(polishing_str)
return list1,_list
return list1,list2
# 获取课程目录列表
def get_course_list(self):
request_url='{}/v1/neirong/kaiwu/getAllCoursePurchasedRecordForPC'.format(lagouApi_host)
request_courseList=request.run_main(method='get',url=request_url,headers=headers)
# request.log.info(request_courseList)
return request_courseList['content']
# 获取课程目录详情
def get_course_detail(self,course_id):
request_url="{}/v1/neirong/kaiwu/getCourseLessons?courseId={}".format(lagouApi_host,course_id)
request_courseDetail=request.run_main(method='get',url=request_url,headers=headers)
return request_courseDetail['content']
# 获取课程详情
def get_course_lesson_detail(self,courseLesson_id):
request_url="{}/v1/neirong/kaiwu/getCourseLessonDetail?lessonId={}".format(lagouApi_host,courseLesson_id)
request_courseDetail = request.run_main(method='get', url=request_url, headers=headers)
return request_courseDetail['content']
#检测文件名称是否符合windows命令规范
def check_filename(self,file_name):
"""
校验文件名称的方法,在 windows 中文件名不能包含('\','/','*','?','<','>','|') 字符
Args:
file_name: 文件名称
Returns:
修复后的文件名称
"""
return file_name.replace('\\', '') \
.replace('/', '') \
.replace('*', 'x') \
.replace('?', '') \
.replace('<', '《') \
.replace('>', '》') \
.replace('|', '_') \
.replace('\n', '') \
.replace('\b', '') \
.replace('\f', '') \
.replace('\t', '') \
.replace('\r', '') \
.replace('_','.') \
.replace(" ","")
# 写入课程内容
def write_course(self,ce):
# print("ce:{}".format(ce))
courseLessonDetail = self.get_course_lesson_detail(ce)
# print("courseLessonDetail:{}".format(courseLessonDetail))
file_name = courseLessonDetail['theme'] + '.md'
file_name = self.check_filename(file_name)
self.file_name_list.append(file_name)
request.log.info("写入课程名称:{}".format(file_name))
'''写入音频信息'''
file_audioMedia_url_str = None
if courseLessonDetail.get('audioMedia'):
file_audioMedia_url = courseLessonDetail['audioMedia']['fileUrl']
self.file_audioMedia_url_list.append(file_audioMedia_url)
# file_audioMedia_url_str = "<a href=\"" + str(file_audioMedia_url) + "\">当前课程音频地址,盘它" + "</a>" + "<br />"
file_content = courseLessonDetail['textContent']
if file_content and file_audioMedia_url_str:
with open(file=os.path.join(self.path, file_name), mode='w', encoding='utf-8') as file:
file.write(file_audioMedia_url_str + "\n")
file.write(file_content)
'''写入视频信息'''
# file_videoMedia_url_str = None
# if courseLessonDetail.get("videoMedia"):
# file_videoMedia_url = courseLessonDetail['videoMedia']['fileUrl']
# file_videoMedia_url_str = "<a href=\"" + str(file_videoMedia_url) + "\">当前课程视频地址,盘它" + "</a>" + "<br />"
# if file_content and file_videoMedia_url_str :
# with open(file=os.path.join(path, file_name), mode='w', encoding='utf-8') as file:
# file.write(file_videoMedia_url_str + "\n")
# file.write(file_content)
def get_course_details_main(self,course_id):
courseDetail = self.get_course_detail(course_id=course_id)
courseDetail_name = courseDetail['courseName']
courseDetail_idList = self.data.json_path_parse_public(
json_path='$.courseSectionList[*].courseLessons[*].id', json_obj=courseDetail)
self.courseDetailDict[courseDetail_name] = courseDetail_idList
# 拿到加密key、以及音频链接后缀
def get_encryKey_and_ts_url(self,url):
text=request.run_main('get',url,data=None,headers=headers,res_format='text')
message = text.split('\n') # 获取key以及ts的url
url_pre=url[:url.rfind("/")+1]
self.ts_url_list=[url_pre+i for i in message if i.endswith(".ts")]
decrypt_url=re.search('URI="(.*?)"',text)
if decrypt_url:
decrypt_url=decrypt_url.group()
decrypt_url=decrypt_url[decrypt_url.find("https://"):len(decrypt_url)-1]
self.key_url_list.append(decrypt_url)
# 获取解密key
def get_decrypt_key(self,decrypt_url):
decrypt_key= request.run_main('get', decrypt_url, data=None, headers=headers, res_format='content')
self.decrypt_key_list.append(decrypt_key)
def write_ts_file(self, ts_url,decrypt_key,file_name):
text = request.run_main('get', ts_url, data=None, headers=headers, res_format='content')
file_name_st_url = file_name+ts_url[ts_url.rfind("-") + 1:len(ts_url) - 3]
cryptor = AES.new(decrypt_key, AES.MODE_CBC, iv=decrypt_key)
audio_file_path='{}/{}'.format(self.path_audio,file_name)
if not os.path.exists(audio_file_path):
os.makedirs(audio_file_path)
with open("{}/{}.mp3".format(audio_file_path,file_name_st_url), 'wb') as f:
# f.write(text)
f.write(cryptor.decrypt(text))
def reset_args(self):
self.ts_url_list=[]
self.key_list=[]
self.key_url_list=[]
self.ce_name=None
self.file_audioMedia_url_list=[]
def main(self,thread_num=10):
start_time=time.time()
request.log.info("开始获取课程目录列表")
courseList=self.get_course_list()
courseList_id=self.data.json_path_parse_public(json_path='$.allCoursePurchasedRecord[?(@.courseType=2)].courseRecordList[*].id',json_obj=courseList)
request.log.info("获取到的课程列表为:\n{}".format(courseList_id))
request.log.info("开始获取课程目录详情")
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as executor:
for url, data in zip(courseList_id, executor.map(self.get_course_details_main, courseList_id)):
pass
# self.threadpool_req(self.get_course_details_main,len(courseList_id),args_list=courseList_id)
request.log.info("获取到的课程详情列表为:\n{}".format(self.courseDetailDict))
request.log.info("开始获取课程详情")
for ce_name,ce_list in self.courseDetailDict.items():
self.ce_name=ce_name
self.path='../拉勾课程/'+self.ce_name
self.path_audio=self.path+'/'+'audio'
for i in [self.path,self.path_audio]:
if not os.path.exists(i):
os.makedirs(i)
request.log.info("开始写入课程:{}".format(self.path))
# if os.path.exists(self.path):
# request.log.info("课程{}已存在,不再执行写入".format(self.path))
# continue
# self.threadpool_req(self.write_course,len(ce_list),args_list=ce_list)
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as executor:
for url, data in zip(ce_list, executor.map(self.write_course, ce_list)):
pass
for i in self.file_audioMedia_url_list:
self.reset_args()
for url,data in zip(i,executor.map(self.get_encryKey_and_ts_url,i)):
pass
for url,data in zip(self.key_url_list,executor.map(self.get_decrypt_key,self.key_url_list)):
pass
self.ts_url_list, self.decrypt_key_list = self.compare_list_polishing(list1=self.ts_url_list, list2=self.decrypt_key_list,
polishing_str=self.decrypt_key_list[0])
self.file_name_list,self.ts_url_list=self.compare_list_polishing(list1=self.file_name_list, list2=self.ts_url_list,
polishing_str=self.file_name_list[0])
for url, data in zip(self.key_url_list, executor.map(self.write_ts_file, self.ts_url_list,self.decrypt_key_list,self.file_name_list)):
pass
request.log.info("课程数据写入完毕")
request.log.info("使用耗时为{:6f}S".format(time.time()-start_time))
if __name__=="__main__":
gl=getLagouCourseData()
gl.main()
用到的依赖
request_main.py
import requests
# from tool.operation_logging import MyLog, logs
class requestMain(object):
requests.packages.urllib3.disable_warnings() #禁用提醒
def __init__(self):
# self.mylog = MyLog.get_log()
# self.log = self.mylog.get_logger()
self.session=requests.session()
@classmethod
def check_headers_files(self,files,headers):
'''
检查headers与files的格式是否合规
'''
if not (files and len(files) <= 4 and isinstance(files, dict)):
files=None
if not headers:
headers=None
return files,headers
def get_main(self, url, params, headers, files=None): # 封装get请求
# verify:验证——(可选)要么是布尔型,在这种情况下,它控制我们是否验证服务器的TLS证书或字符串,在这种情况下,它必须是通往CA捆绑包的路径。默认值为True
# res=requests.get(url=url,params=data,headers=headers,verify=false)
# get请求请求参数尽量不要编码,防止会有一些错误,这里手动处理一下错误
files,headers=self.check_headers_files(files=files,headers=headers)
res = requests.get(url=url, params=params, headers=headers, files=files, verify=False)
return res
def post_main(self, url, data, headers, files=None): # 封装post请求
files, headers = self.check_headers_files(files=files, headers=headers)
res = requests.get(url=url, data=data, headers=headers, files=files, verify=False)
return res
def put_main(self, url, data, headers, files=None): # 封装put请求
files, headers = self.check_headers_files(files=files, headers=headers)
res = requests.get(url=url, data=data, headers=headers, files=files, verify=False)
return res
def delete_main(self, url, data, headers, files=None): # 封装put请求
files, headers = self.check_headers_files(files=files, headers=headers)
res = requests.get(url=url, data=data, headers=headers, files=files, verify=False)
return res
def run_main(self, method, url, data=None, headers=None, files=None, res_format='json'): # 封装主请求
'''参数1:请求方式,参数2:请求data,参数3:请求信息头,参数4:返回的数据格式'''
# 相关源码:
# ''' :param files: (optional) Dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``) for multipart encoding upload.
# ``file-tuple`` can be a 2-tuple ``('filename', fileobj)``, 3-tuple ``('filename', fileobj, 'content_type')``
# or a 4-tuple ``('filename', fileobj, 'content_type', custom_headers)``, where ``'content-type'`` is a string
# defining the content type of the given file and ``custom_headers`` a dict-like object containing additional headers
# to add for the file. '''
# files参数示例:
# files={'file': ('git.docx', open('C:/Users/Acer/Downloads/git.docx', 'rb'))}
res = None
if method.lower() == 'get' or method.upper() == 'GET':
res = self.get_main(url=url, params=data, headers=headers,files=files)
elif method.lower() == 'post' or method.upper() == 'POST':
res = self.post_main(url=url, data=data, headers=headers, files=files)
elif method.lower() == 'put' or method.upper() == 'PUT':
res = self.put_main(url=url, data=data, headers=headers, files=files)
elif method.lower() == 'delete' or method.upper() == 'DELETE':
res = self.delete_main(url=url, data=data, headers=headers, files=files)
else:
# self.log.info("暂不支持的请求方式")
raise Exception("暂不支持的请求方式")
# dumps方法:
# sort_keys是告诉编码器按照字典排序(a到z)输出,indent参数根据数据格式缩进显示,读起来更加清晰:
# separators参数的作用是去掉,,:后面的空格,skipkeys可以跳过那些非string对象当作key的处理,
# 输出真正的中文需要指定ensure_ascii=False
# self.log.info(res.text)
# self.log.info("请求响应时间为:{}S".format(res.elapsed.total_seconds()))
# self.log.info("请求响应状态码:{}".format(res.status_code))
# self.log.info(res.text)
# print(res)
if res:
try:
if res_format.lower() == 'json' or res_format.upper() == 'JSON': # 以json格式返回数据
'''ensure_ascii:处理json编码问题(中文乱码),separators:消除json中的所有空格'''
response = res.json()
elif res_format.lower() == 'text' or res_format.upper() == 'TEXT': # 以文本格式返回数据
response = res.text
elif res_format.lower() == 'str' or res_format.upper() == 'STR': # 以文本格式返回数据
response = res.text
elif res_format.lower() == 'content' or res_format.upper() == 'CONTENT': # 以二进制形式返回响应数据
response = res.content
else: # 以json格式返回数据
response = res.json()
# print(response)
return response
except BaseException as e:
# self.log.error('error:{}'.format(e))
print(e)
# print(res.text)
else:
return None
if __name__ == '__main__':
r = requestMain()
url = 'https://fwh.lpcollege.com/admin.php/system/feedback/index.html'
# data='page=1&limit=10&keywords=秦敏&startDate=&endDate='
data = b'page=1&limit=10&keywords=\xe7\xa7\xa6\xe6\x95\x8f&startDate=&endDate='
header = {'x-requested-with': 'XMLHttpRequest',
'Cookie': 'PHPSESSID=437649699becad37fe1587064163e990b9e0e5b1ff81506b681069dbcdd3a035'}
# print(r.run_main('get', url, data=data, headers=header, res_format='json'))
print(r.get_main(url=url, data=data, headers=header))
data_util.py
from jsonpath_rw_ext import parse
class dataUtil(object):
def __init__(self):
# self.log = logs()
pass
# 返回依赖数据
def depend_data_parse(self,dependkey,response_data,index='one'):
__dict={}#存放字典
'''处理依赖'''
if dependkey:
# 匹配字典key
depend_data_index = dependkey.rfind('.')
depend_data_str = dependkey[depend_data_index + 1:]
try:
math_value = self.json_path_parse_public(json_path=dependkey,json_obj=response_data)
if math_value:
if index=='one':
math_value=math_value[0]
__dict[depend_data_str]=math_value
return __dict
else:
return None
except IndexError as indexerror:
return None
else:
return None
# 根据jsonpath表达式获取json对象公共方法,部分功能还需要测试
def json_path_parse_public(self,json_path,json_obj,get_index:bool=False):
if json_path:
# 定义要获取的key
# 定义响应数据,key从响应数据里获取
# print(madle)
# math.value返回的是一个list,可以使用索引访问特定的值jsonpath_rw的作用就相当于从json里面提取响应的字段值
try:
json_exe = parse(json_path)
madle = json_exe.find(json_obj)
math_value = [i.value for i in madle]
if get_index:
return math_value[0]#返回匹配结果第0个元素
return math_value
except IndexError as indexerror:
print(indexerror)
return []
except Exception as e:
print(e)
return []
else:
return []
if __name__ == "__main__":
du=dataUtil()
# du.json_path_parse_public(json_obj={"1":"2"},json_path="$.*")
标签:拉勾,python,request,self,list,url,file,path,多线程
From: https://www.cnblogs.com/qtclm/p/17037281.html