顶点小说进阶(多进程+协程)
建议:
看之前可以先看我之前发布的文章(异步优化与数据入库: 顶点小说爬虫进阶实战)
这篇文章基于上篇文章:进行了多进程处理,大大加快了爬取速度
案例:顶点小说完善(多进程)
优化思路:
- 导包:from multiprocessing import Pool
- 对于每一页的所有小说采用一个进程,建立进程池,for循环处向进程池添加任务(对于每一页的所有小说的处理封装成一个方法作为任务添加到进程池)
import asyncio
import logging
import time
import requests
from lxml import etree
import aiohttp
import aiomysql
from aiohttp import ContentTypeError
from multiprocessing import Pool
CONCURRENCY = 4
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s : %(message)s')
class Spider(object):
def __init__(self):
# 方便设置头部信息、代理IP、cookie信息等
self.session = None
# 设置协程数量
self.semaphore = asyncio.Semaphore(CONCURRENCY)
# 限制协程的并发数:
# 如果并发数没有达到限制: 那么async with semaphore会瞬间执行完成,进入里面的正式代码中
# 如果并发数已经达到了限制,那么其他的协程对象会阻塞在asyn with semaphore这个地方,直到正在运行的某个协程对象完成了,退出了,才会放行一个新的协程对象去替换掉这个已经完成的协程对象
# 初始化数据库连接池
async def init_pool(self):
self.pool = await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
db=f"dingdian",
autocommit=True # Ensure autocommit is set to True for aiomysql
)
# 在 aiomysql.create_pool 方法中,不需要显式传递 loop 参数。aiomysql 会自动使用当前的事件循环(即默认的全局事件循环)。
# 关闭数据库连接池
async def close_pool(self):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
# 获取url源码
async def scrape_api(self, url):
# 设置协程数量
async with self.semaphore:
logging.info(f"scraping {url}")
try:
async with self.session.get(url) as response:
# 控制爬取(或请求)的速率,以避免对目标服务器造成过多的负荷或请求频率过高而被封禁或限制访问。
await asyncio.sleep(1)
# 在异步环境中,可能需要使用 response.content.read() 或 await response.text() 来获取文本内容。
return await response.text()
except ContentTypeError as e: # aiohttp 的 ContentTypeError 异常: 请求内容类型错误 或者 响应内容类型错误
# exc_info=True 参数将导致 logging 模块记录完整的异常信息,包括栈跟踪,这对于调试非常有用。
logging.error(f'error occurred while scraping {url}', exc_info=True)
# 获取小说分类url
async def get_type(self):
url = "https://www.cdbxs.com/sort/"
source = await self.scrape_api(url)
href_lists = etree.HTML(source).xpath('//ul[@class="nav"]/li/a/@href')[2:-4]
type_lists = []
for href in href_lists:
type_lists.append(f"{url}{href.split('/')[2]}/1/")
# print(type_lists)
return type_lists
# 获取最大页
async def get_max_page(self, first_page_url):
source = await self.scrape_api(first_page_url)
# print(source)
max_page = etree.HTML(source).xpath('//a[13]/text()')
return max_page
# 获取小说列表页信息
async def get_book_info(self, every_page_url):
source = await self.scrape_api(every_page_url)
book_lists = []
lis = etree.HTML(source).xpath("//ul[@class='txt-list txt-list-row5']/li")
for li in lis:
book_id_url = li.xpath("span[@class='s2']/a/@href")[0]
book_id = book_id_url.split('/')[3]
# 书名
book_name = li.xpath("span[@class='s2']/a/text()")[0]
# 最新章节
new_chapter = li.xpath("span[@class='s3']/a/text()")[0]
# 作者
author = li.xpath("span[@class='s4']/text()")[0]
# 更新时间
update_time = li.xpath("span[@class='s5']/text()")[0]
source = await self.scrape_api(f"https://www.cdbxs.com{book_id_url}")
# 字数
font_num = etree.HTML(source).xpath("//p[6]/span/text()")[0]
# 摘要
summary = etree.HTML(source).xpath("//div[@class='desc xs-hidden']/text()")[0]
# 以元组添加至 book_lists
# print((book_id, book_name, new_chapter, author, update_time, font_num, summary))
book_lists.append((book_id, book_name, new_chapter, author, update_time, font_num, summary))
return book_lists
# 获取章节urls
async def get_chapter_urls(self, chapter_list_url):
source = await self.scrape_api(chapter_list_url)
# 章节url
chapter_urls = map(lambda x: "https://www.cdbxs.com" + x, etree.HTML(source).xpath(
"//div[@class='section-box'][2]/ul[@class='section-list fix']/li/a/@href | //div[@class='section-box'][1]/ul[@class='section-list fix']/li/a/@href"))
return chapter_urls
# 获取章节详情信息
async def get_chapter_info(self, chapter_url):
source = await self.scrape_api(chapter_url)
# 标题
title = etree.HTML(source).xpath("//h1[@class='title']/text()")
# 正文
content = ''.join(etree.HTML(source).xpath("//div[@id='nb_content']/dd//text()"))
if title:
return f'\'{title[0]}\'', f'\'{content}\''
else:
return '', f'\'{content}\''
# 入库
async def save_to_mysql(self, table_name, table_column_str, table_info_str):
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = f'insert into {table_name}({table_column_str}) values{table_info_str}'
# 执行SQL语句
await cursor.execute(sql)
await conn.commit()
async def main(self):
# headers
global pool
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0"
}
# 建立异步请求需要的session(主要加header头信息以及代理,cookie等头信息)
self.session = aiohttp.ClientSession(headers=headers)
# 获取小说分类url
type_lists = await self.get_type()
# 分类url默认为第一页
for first_page_url in type_lists:
# 获取带分类的url的前半截
type_url = first_page_url.split('1')[0]
# 获取此分类下最大页
max_page = await self.get_max_page(first_page_url)
# 生成此分类下每一页url
for every_page in range(1, int(max_page[0]) + 1):
every_page_url = f"{type_url}{every_page}/"
# 获取小说列表页信息
book_info_lists = await self.get_book_info(every_page_url)
# 创建进程池
pool = Pool(16)
for book_info in book_info_lists:
# 多进程抓取每本小说
pool.apply_async(await self.run(book_info))
# 关闭进程池,即停止接受新的任务。
pool.close()
# 等待所有的子进程执行结束。它会阻塞主进程,直到进程池中所有的任务都被执行完毕,然后才会继续执行主进程后面的代码。
# 调用 join() 方法之前,应该先调用 close() 方法来确保不会再有新的任务被提交进来。
pool.join()
# 关闭连接池
self.close_pool()
# 关闭连接
await self.session.close()
# run方法: 抓取每一本小说的所有章节
async def run(self, book_info):
print(f"爬取小说:{book_info[1]}...")
# 初始化数据库连接池
await self.init_pool()
# 入库小说信息
await self.save_to_mysql('books',
'book_id, book_name, new_chapter, author, update_time, font_num, summary',
book_info)
# 获取章节urls
book_id = book_info[0]
chapter_urls = await self.get_chapter_urls(f"https://www.cdbxs.com/booklist/b/{book_id}/1")
# 多协程抓取小说各个章节
# 生成scrape_detail任务列表
scrape_detail_tasks = [asyncio.ensure_future(self.get_chapter_info(chapter_url)) for chapter_url in
chapter_urls]
# 并发执行任务,获取结果
chapter_details = list(
await asyncio.gather(*scrape_detail_tasks)) # await asyncio.gather(*scrape_detail_tasks生成元组
# 入库
# 1.添加book_id 到 chapter_detail
for i in range(len(chapter_details)):
chapter_detail = list(chapter_details[i])
chapter_detail.append(book_id)
chapter_detail = tuple(chapter_detail)
chapter_details[i] = chapter_detail
# 2.保存至数据库
[await self.save_to_mysql('chapters', 'chapter_name,chapter_content, bid',
chapter_detail) for chapter_detail in chapter_details]
if __name__ == '__main__':
# 开始时间
start_time = time.time()
# 初始化Spider
spider = Spider()
# 创建事件循环池
loop = asyncio.get_event_loop()
# 注册
loop.run_until_complete(spider.main())
# 结束时间
end_time = time.time()
logging.info(f'total time: {end_time - start_time}')
后续发布爬虫更多精致内容(按某培训机构爬虫课程顺序发布,欢迎关注后续发布)