多线程和多进程爬虫
在很多场景中,爬虫需要抓取大量的数据,而且需要做大量的分析工作。如果只使用单线程的爬虫,效率会非常低。通常有实用价值的爬虫会使用多线程或多进程,这样可以很多工作同时完成,尤其在多CPU的机器上,执行效率更是惊人。
一、线程与进程
1、进程
计算机程序有静态和动态的区别。静态的计算机程序就是存储在磁盘上的可执行二进制文件,而动态的计算机程序就是将这些可执行文件加载到内存中并被操作系统调用,这些动态的计算机程序被称为一个进程,也就是说,进程是活跃的,只有可执行程序被调入内存中才叫进程。每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。操作系统会管理系统中的所有进程的执行。并为这些进程合理的分配时间。进程可以通过派生新的进程来执行其他任务,不过由于每个新进程也都拥有自己的内存和数据栈等,所以只能采用进程间通信的方式共享信息。
2、线程
线程与进程类似,不过线程是在同一个进程下执行的,并共享同一个上下文。也就是说,线程属于进程,而且线程必须依赖进程才能执行。一个进程可以包含一个或多个线程。
线程包括开始、执行和结束三部分。他有一个指令指针,用于记录当前运行的上下文,当其它线程运行时,当前线程有可能被抢占或临时挂起。
一个进程中的各个线程与主线程共享同一块数据空间,因此相对于独立的进程而言,线程间的信息共享和通信更容易。线程一般是以并发方式执行的,正是由于这种并行和数据共享机制,使得多任务间的协作成为可能。当然,在单核CPU的系统中,并不存在真正的并发执行,所以线程的执行实际上是同步执行的,只是系统会根据调度算法在不同的时间安排某个线程在CPU上执行一小会儿,然后就让其他线程在CPU上再执行,通过这种多个线程之间不断切换的方式让多个线程交替执行。因此,从宏观上看,即使在单核CPU的系统上仍然想多个线程并发运行一样。
当然,多线程之间共享数据并不是没有风险。如果两个或多个线程访问了同一块数据,由于数据访问顺序不同,可能导致结果不一致。这种情况通常称为静态条件,幸运的是,大多数线程库都有一些机制让共享内存区域的数据同步,也就是说,当一个线程访问这片内存区域时,这片内存区域就暂时被锁定,其他线程就只能等待这片内存区域解锁后在访问。
二、Python与线程
使用_thread模块中的start_new_thread函数会直接开启一个线程,该函数的第一个参数需要指定一个函数,可以把这个函数称为线程函数,当线程启动时会自动调用这个函数。start_new_thread函数第二个参数是给线程传递的参数,必须是元组类型。
1、线程和锁
在普通的线程代码,在main函数的最后需要使用sleep函数让程序处于休眠状态,其实,这是因为程序无法知道是否有线程正在执行,以及是否所有的线程函数都执行完毕。
这里的锁并不是将程序锁住不退出,而是通过锁可以让程序了解是否含有线程函数没执行完,而且可以做到当所有的线程函数执行完毕后,程序会立刻退出,而无序任何等待。
锁的使用分为创建锁、获取锁和释放锁。完成这3个功能需要_thread模块中的一个函数和两个方法,allocate_lock函数用于创建锁对象,然后使用锁对象的acquire方法获取锁,如果不需要锁了,可以使用锁对象的release方法释放锁。如果要判断锁是否被释放,可以使用锁对象的locked方法。
三、高级线程模块
更高级的线程模块threading。在threading模块中有一个非常重要的Thread类,该类的实例表示一个执行线程的对象。
在前面讲的_thread模块可以看作线程的面向过程版本,而Thread类可以看作线程的面向对象版本。
1、Thread类与线程函数
在前面的例子中使用锁检测线程是否释放,以及使用锁可以保证所有的线程函数都执行完毕再往下执行。如果使用Thread类处理线程就方便得多了,可以直接使用Thread对象的join方法等待线程函数执行完毕再往下执行,也就是说,在主线程中调用Thread对象的join方法,并且Thread对象的线程函数没有执行完毕,主线程会处于阻塞状态。
使用Thread类也很简单,首先需要创建类的实例,通过Thread类构造方法的target关键字参数执行线程函数,通过args关键字参数指定传给线程函数的参数。然后调用Thread对象的start方法启动线程。
2、Thread类与线程对象
Thread类构造方法的target关键字参数不仅可以是一个函数,还可以是一个对象,可以称这个对象为线程对象。其实线程调用的仍然是函数,只是这个函数用对象进行了封装。这么做的好处是可以将与线程函数相关的代码都放在对象对应的类中,这样更能体现面向对象的封装性。
线程对象对应的类需要有一个可以传入线程函数和参数的构造方法,而且在类中还必须有一个名为“call”的方法。当线程启动时,会自动调用线程对象的call方法,然后在该方法中调用线程函数。
3、从Thread类继承
为了更好地对与线程有关的代码进行封装,可以从Thread类派生一个子类。然后将与线程有关的代码放到这个类中。Thread类的子类的使用方法与Thread相同。从Thread类继承最简单的方式是子类的构造方法中通过super函数调用父类的构造方法,并传入相应的参数值。
四、线程同步
多线程的目的就是让多段程序并发运行,但在一些情况下,让多段程序同时运行会造成很多麻烦,如果这些并发运行的程序还共享数据,有可能造成脏数据以及其他数据不一致的后果。
1、线程锁
线程锁的目的就是将一段代码锁住,一旦获取了锁权限,除非释放线程锁,否则其他任何代码都无法再次获得锁权限。为了使用线程锁,首先需要创建Lock类的实例,然后通过Lock对象的acquire方法获取锁权限,当需要完成原子操作的代码段执行完后,再使用Lock对象的realease方法释放锁,这样其他代码就可以再次获得这个锁的权限了。要注意的是,锁对象要放到线程函数的外面作为一个全局变量,这样所有的线程函数实例都可以共享这个变量,如果将锁对象放到线程函数内部,那么这个锁对象就变成局部变量了,多个线程函数实例使用的是不同的锁对象,所以仍然不能有效保护原子操作的代码。
2、信号量
信号量是最古老的同步原语之一,它是一个计数器,用于记录资源消耗情况。当资源消耗时递减,当资源释放时递增。可以认为信号量代表资源是否可用。消耗资源使计数器递减的操作习惯上称为P,当一个线程对一个资源完成操作时,该资源需要返回资源池,这个操作一般称为V。Python语言统一了所有的命名,使用与线程锁同样的方法名消耗和释放资源。acquire方法用于消耗资源,调用该方法计数器会减一,release方法用于释放资源,调用该方法计数器会加1。
使用信号量首先要创建BoundedSemaphore类的实例,并且通过该类的构造方法传入计数器的最大值,然后就可以使用BoundedSemaphore对象的acquire方法和release方法获取资源和释放资源。
五、多进程
尽管多线程可以实现并发执行,不过多个线程之间是共享当前进程的内存的,也就是说,线程可以申请到的资源有限。要想进一步发挥并发的作用,可以考虑使用多进程。
如果建立的进程比较多,可以使用multiprocessing模块的进程池,通过Pool类构造方法的processes参数,可以指定创建的进程数。Pool类有一个map方法,用于将回调函数与要给回调函数传递的数据管理起来
实战案例:爬取豆瓣音乐(https://music.douban.com/)
这个网站之前爬过,在这里再爬一次,是为了与之前的速度进行对比。并且下面的爬虫案例是多线程的。
创建四个线程
导入线程类
import threading
导入时间模块,可以更直观的看到运行总时间
import datetime
import requests
from bs4 import BeautifulSoup
from lxml import etree
import time
import random
from UA_info import ua_list
starttime = datetime.datetime.now()
创建线程锁
lock = threading.Lock()
使用生成器创建列表
urls = ['https://music.douban.com/top250?start={}'.format(i) for i in range(0,100,25)]
headers = {
'User-Agent':random.choice(ua_list)
}
def get_url():
#声明全局变量
global urls
#获取url之前创建一个锁
lock.acquire()
if len(urls) == 0:
lock.release()
return None
else:
url = urls[0]
del urls[0]
lock.release()
return url
def get_href(url,thread_name):
response = requests.get(url,headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text,'lxml')
href_list = soup.find_all("a",attrs={"class":"nbg"})
for href in href_list:
get_music_data(href['href'],thread_name)
def get_music_data(href,thread_name):
response = requests.get(url=href,headers=headers)
if response.status_code == 200:
lxml = etree.HTML(response.text)
title = lxml.xpath('//[@id="wrapper"]/h1/span/text()')
acter = lxml.xpath('//div[@id="info"]//span/a/text()')
score = lxml.xpath('//[@id="interest_sectl"]/div/div[2]/strong/text()')
info = {
'title':title[0],
'acter':acter[0],
'score':score[0]+"分"
}
print(thread_name,info)
print('================================')
这是一个线程类
class SpiderThread(threading.Thread):
def init(self,name):
threading.Thread.init(self)
#name是线程名
self.name = name
def run(self):
while True:
url = get_url()
if url != None:
get_href(url,self.name)
else:
break
if name == 'main':
#创建四个线程
thread1 = SpiderThread('thread1')
thread2 = SpiderThread('thread2')
thread3 = SpiderThread('thread3')
thread4 = SpiderThread('thread4')
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print("退出爬虫")
endtime = datetime.datetime.now()
print("消耗时间:",(endtime - starttime).seconds,"秒")
thread1
thread3
thread4
thread2
thread1
thread4
thread2
退出爬虫
消耗时间: 31 秒
导入进程池
from multiprocessing import Pool
import datetime
import requests
from bs4 import BeautifulSoup
from lxml import etree
import time
import random
from UA_info import ua_list
starttime = datetime.datetime.now()
urls = ['https://music.douban.com/top250?start={}'.format(i) for i in range(0,100,25)]
headers = {
'User-Agent':random.choice(ua_list)
}
def get_href(url):
response = requests.get(url,headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text,'lxml')
href_list = soup.find_all("a",attrs={"class":"nbg"})
for href in href_list:
get_music_data(href['href'])
def get_music_data(href):
response = requests.get(url=href,headers=headers)
if response.status_code == 200:
lxml = etree.HTML(response.text)
title = lxml.xpath('//[@id="wrapper"]/h1/span/text()')
acter = lxml.xpath('//div[@id="info"]//span/a/text()')
score = lxml.xpath('//[@id="interest_sectl"]/div/div[2]/strong/text()')
info = {
'title':title[0],
'acter':acter[0],
'score':score[0]+"分"
}
print(info)
print('================================')
if name == 'main':
#创建并启动四个进程
pool = Pool(processes=4)
pool.map(get_href,urls)
print("退出爬虫")
endtime = datetime.datetime.now()
print("消耗时间:",(endtime - starttime).seconds,"秒")
退出爬虫
消耗时间: 32 秒