【一】进程间通信(管道)
- 借助于消息队列,进程可以将消息放入队列中,然后由另一个进程从队列中取出。
- 这种通信方式是非阻塞的,即发送进程不需要等待接收进程的响应即可继续执行。
- multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
- 进程间通信(IPC)有两种方式:队列、管道
【1】介绍
(1)创建管道的类
Pipe([duplex])
- 在进程之间创建一条管道,并返回
(conn1,conn2)
,其中 conn1, conn2 表示管道两端的连接对象 - 强调一点:必须在产生 Process 对象之前产生管道
- 在进程之间创建一条管道,并返回
(2)参数介绍
duplex
- 默认管道是全双工的,如果将
duplex
射成 False,conn1 只能用于接收,conn2 只能用于发送。
- 默认管道是全双工的,如果将
(3)主要方法
conn1.recv()
- 接收 conn2.send(obj) 发送的对象。
- 如果没有消息可接收,recv方法会一直阻塞
- 如果连接的另外一端已经关闭,那么 recv 方法会抛出 EOFError
conn1.send(obj)
- 通过连接发送对象。obj 是与序列化兼容的任意对象。
(4)次要方法
-
conn1.close()
-
- 关闭连接。如果conn1被垃圾回收,将自动调用此方法
-
conn1.fileno()
-
- 返回连接使用的整数文件描述符
-
conn1.poll([timeout])
-
- 如果连接上的数据可用,返回True。
- timeout指定等待的最长时限。
- 如果省略此参数,方法将立即返回结果。
- 如果将timeout射成None,操作将无限期地等待数据到达。
-
conn1.recv_bytes([maxlength])
-
- 接收c.send_bytes()方法发送的一条完整的字节消息。
- maxlength指定要接收的最大字节数。
- 如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。
- 如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
-
conn.send_bytes(buffer [, offset [, size]])
-
- 通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。
- 结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
-
conn1.recv_bytes_into(buffer [, offset]):
-
- 接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。
- offset指定缓冲区中放置消息处的字节位移。
- 返回值是收到的字节数。
- 如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
- 基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)
【2】使用
# 管道类介绍
'''
# Pipe
from multiprocessing import Pipe
# [1]创建管道对象
left_pipe, right_pipe = Pipe() # 默认参数是 duplex:默认双通道的管道
# [2]主要的方法
# 1.接收数据
# 先将另一端关闭 ---> 一端取数据
left_pipe.close()
right_pipe.recv()
# 2.发送数据
left_pipe.close()
right_pipe.send()
'''
# 使用
from multiprocessing import Pipe, Process
def producer(pipe_conn, name):
# 【1】获取到两个管道对象:左侧管道对象、右侧管道对象
left_conn, right_conn = pipe_conn
# 【2】放数据
# 先关闭一侧管道
right_conn.close()
# 再通过右侧管道传数据
for i in range(5):
data = f'producer:>>> {name} 生产的 {i}'
print(data)
left_conn.send(data)
# 传递完所有数据之后一定不要忘了将打开的管道关闭
left_conn.close()
def customer(pipe_conn, name):
# [1]获取到两个管道对象:左侧管道对象、右侧管道对象
left_conn, right_conn = pipe_conn
# [2]取数据
# 关闭左管道数据
left_conn.close()
# 通过右侧管道取数据
while True:
data = right_conn.recv()
print(f'customer:>>> {name} 消费了 {data}')
if not data:
break
right_conn.close()
def main():
# 【一】创建管道对象
pipe = Pipe()
# 【二】创建消费者对象和生产者对象
producer_one = Process(
target=producer,
args=(pipe, f'producer_one')
)
# 【三】创建消费者
customer_one = Process(
target=customer,
args=(pipe, f'customer_one')
)
producer_one.start()
customer_one.daemon = True
customer_one.start()
producer_one.join()
# 管道需要创建一个管道对象
# 管道对象里面有左右两个管道对象
# 传数据的时候要关闭一侧,从另一侧传数据进去
# 取数据的时候也要关闭一侧,从另一侧取数据
if __name__ == '__main__':
main()
# producer:>>> producer_one 生产的 0
# producer:>>> producer_one 生产的 1
# producer:>>> producer_one 生产的 2
# producer:>>> producer_one 生产的 3
# producer:>>> producer_one 生产的 4
# customer:>>> customer_one 消费了 producer:>>> producer_one 生产的 0
# customer:>>> customer_one 消费了 producer:>>> producer_one 生产的 1
# customer:>>> customer_one 消费了 producer:>>> producer_one 生产的 2
# customer:>>> customer_one 消费了 producer:>>> producer_one 生产的 3
# customer:>>> customer_one 消费了 producer:>>> producer_one 生产的 4
【二】多线程理论
【1】什么是线程
-
在操作系统中,每一个进程都有一块内存空间地址,你的程序就泡在这块内存地址上。
-
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
-
线程,就是一条流水线工作的过程
- 一条流水线必须属于一个车间,一个车间的工作过程是一个进程
- 车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
- 流水线的工作需要电源,电源就相当于CPU
-
所以进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是 cpu 上的执行单位。
-
多线程就是在进程 CPU 处理多个任务的逻辑。
-
例如:
- 进程就是你的资源单位 :车间 ---》存储设备及资源
- 线程就是你的执行单位 :流水线 ---》负责对数据进行加工和处理
【总结】
-
每一个进程必定自带一个线程
-
进程:资源单位
-
- 起一个进程仅仅只是 在内存空间中开辟出一块独立的空间
-
线程:执行单位
-
- 真正被CPU执行的其实是进程里面的线程
- 线程指的就是代码的执行过程,执行代码中所需要使用到的资源都找所在的进程索要
-
进程和线程都是虚拟单位,只是为了我们更加方便的描述问题
【2】线程的创建开销
(1)进程和线程创建的开销问题
-
进程的创建开销 > 线程的创建开销
-
比如一个软件就是一个工厂
- 工厂内部有很多流水线
- 流水线工作需要资源(基于电工作),电源就是:一个CPU
- 一个车间就是一个进程,一个车间内部至少会有一个线程(流水线)
-
创建进程 ---》相当于你要创建一个车间 ---》选地,买设备
-
创建线程 ---》有了车间,只需要增加流水线 ---》减少了开销
【3】进程和线程之间的关系
-
进程与进程之间的关系是竞争关系
- 在同一块内存中抢占内存空间 ---》内存空间越大你的程序跑的速度就越快
-
线程与线程之间的关系是协作关系
- 线程是在同一块进程之下的工友
【4】进程和线程的区别
-
Threads share the address space of the process that created it; processes have their own address space.
-
- 线程共享创建它的进程的地址空间; 进程具有自己的地址空间。
-
Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
-
- 线程可以直接访问其进程的数据段; 进程具有其父进程数据段的副本。
-
Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
-
- 线程可以直接与其进程中的其他线程通信; 进程必须使用进程间通信与同级进程进行通信。
-
New threads are easily created; new processes require duplication of the parent process.
-
- 新线程很容易创建; 新进程需要复制父进程。
-
Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
-
- 线程可以对同一进程的线程行使相当大的控制权。 进程只能控制子进程。
-
Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
-
- 对主线程的更改(取消,优先级更改等)可能会影响该进程其他线程的行为; 对父进程的更改不会影响子进程。
-
思考:
- 开发一款文字处理软件 --- 进程
- 获取用户输入的功能 --- 线程
- 实时展示到屏幕的功能 --- 线程
- 自动保存数据到硬盘的功能 --- 线程
-
开启一个文字处理软件,该进程需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘 ...
-
这三个任务操作的都是同一块数据,因而不能用多进程。
-
只能在一个进程里并发开启三个线程
-
如果是单线程,那就只能是键盘输入时,不能处理文字和自动保存,自动保存时也不能输入和处理文字。
【三】开设多线程的两种方式
- 多进程和多线程只有在遇到阻塞的时候才能体现出速度快,如果没有阻塞,就没有效率。
【1】threading 模块介绍
- multiprocess模块的完全模仿了threading模块的接口
- 二者在使用层面,有很大的相似性.
from threading import Thread
import time
import random
class MyThread(Thread):
def __init__(self,name,*args,**kwargs):
super().__init__(*args,**kwargs)
self.name = name
def run(self):
print(f'{self.name} is starting ...')
sleep_time = random.randint(1,5)
print(f'{self.name} is start sleep time is {sleep_time}s ...')
time.sleep(sleep_time)
print(f'{self.name} is end sleep time is {sleep_time}s ...')
print(f'{self.name} is ending ...')
# 方式一:通过线程的对象来开启多线程(直接调用Thread方法)
def work(name):
print(f'{name} is starting ...')
print(f'thread {name} pid is {os.getpid()} ppid is {os.getppid()} ...')
sleep_time = random.randint(1,5)
print(f'{name} is start sleep time is {sleep_time}s ...')
time.sleep(sleep_time)
print(f'{name} is end sleep time is {sleep_time}s')
print(f'{name} is ending ...')
def main():
task_list = []
for i in range(3):
task = Thread(
target=work,
args=(f'thread_{i}',)
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
# 方式二:继承Thread父类
def main_one():
task_list = []
for i in range(5):
task = MyThread(name=f'thread_{i}')
task.start()
task_list.append(task)
for task in task_list:
task.join()
if __name__ == '__main__':
print(f'main process pid is {os.getpid()} ppid is {os.getppid()} ...')
print(f'main process starting ...')
start_time = time.time()
# main()
main_one()
end_time = time.time()
print(f'main process ending ...')
print(f'总耗时:>>>{end_time-start_time}s')
# main process pid is 11224 ppid is 20884 ...
# main process starting ...
# thread_0 is starting ...
# thread_0 is start sleep time is 5s ...
# thread_1 is starting ...
# thread_1 is start sleep time is 5s ...
# thread_2 is starting ...
# thread_2 is start sleep time is 1s ...
# thread_3 is starting ...
# thread_3 is start sleep time is 5s ...
# thread_4 is starting ...
# thread_4 is start sleep time is 5s ...
# thread_2 is end sleep time is 1s ...
# thread_2 is ending ...
# thread_0 is end sleep time is 5s ...
# thread_4 is end sleep time is 5s ...
# thread_4 is ending ...thread_0 is ending ...
#
# thread_1 is end sleep time is 5s ...
# thread_3 is end sleep time is 5s ...
# thread_3 is ending ...thread_1 is ending ...
#
# main process ending ...
# 总耗时:>>>5.001804351806641s
【四】线程之间共享数据
-
多线程之间数据共享,进程之间不共享
-
示例
from threading import Thread
from multiprocessing import Process
number = 920
def work(name):
global number
print(f'{name} change before {number}')
number += 1
print(f'{name} change after {number}')
def main_process():
task_list = []
for i in range(5):
task = Process(
target=work,
args=(f'process_{i}',)
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
print(number)
def main_thread():
task_list = []
for i in range(5):
task = Thread(
target=work,
args=(f'thread_{i}',)
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
print(number)
if __name__ == '__main__':
main_thread() # 多线程
main_process() # 多进程
# 多线程
thread_0 change before 920
thread_0 change after 921
thread_1 change before 921
thread_1 change after 922
thread_2 change before 922
thread_2 change after 923
thread_3 change before 923
thread_3 change after 924
thread_4 change before 924
thread_4 change after 925
925
# 多进程
process_0 change before 920
process_0 change after 921
process_2 change before 920
process_2 change after 921
process_3 change before 920
process_3 change after 921
process_1 change before 920
process_1 change after 921
process_4 change before 920
process_4 change after 921
925
【五】多线程和多进程时间比较(爬虫技术)
import time
# 【一】需要两个模块
# 【1】模仿浏览器对网址发起请求
import requests # pip install requests
# 【2】解析页面数据的模块
from lxml import etree # pip install lxml
# 【3】模仿浏览器
from fake_useragent import UserAgent # pip install fake-useragent
from multiprocessing import Process
from threading import Thread
# 【二】解析网页请求及数据
class SpiderImg(object):
def __init__(self):
self.base_area = 'https://pic.netbian.com'
self.base_url = 'https://pic.netbian.com/4kdongman/'
self.headers = {
'User-Agent': UserAgent().random
}
def spider_tag_url(self):
img_data_dict = {}
response = requests.get(self.base_url, headers=self.headers)
# response.encoding = 'utf-8'
response.encoding = 'gbk'
page_text = response.text
tree = etree.HTML(page_text)
li_list = tree.xpath('//*[@id="main"]/div[4]/ul/li')
for li in li_list:
# //*[@id="main"]/div[4]/ul/li[1]/a
# ./a
detail_href = self.base_area + li.xpath('./a/@href')[0]
response = requests.get(detail_href, headers=self.headers)
response.encoding = 'gbk'
page_text = response.text
tree = etree.HTML(page_text)
img_url = self.base_area + tree.xpath('//*[@id="img"]/img/@src')[0]
# https://pic.netbian.com/uploads/allimg/240521/232729-17163052491e1c.jpg
img_title = img_url.split('/')[-1]
# 240521/232729-17163052491e1c.jpg
img_data_dict[img_title] = img_url
return img_data_dict
def download_img(self, img_url, img_title):
# 获取到图片的二进制数据
response = requests.get(img_url, headers=self.headers)
img_data = response.content
with open(f'{img_title}', 'wb') as fp:
fp.write(img_data)
print(f'当前下载 {img_title} 成功!')
def main_process(self):
start_time = time.time()
img_data_dict = self.spider_tag_url()
end_time = time.time()
print(f'抓取所有图片连接数据 {len(img_data_dict)} 总耗时 :>>>> {end_time - start_time}s')
task_list = []
for img_title, img_url in img_data_dict.items():
task = Process(
target=self.download_img,
kwargs={'img_url': img_url, 'img_title': img_title}
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
def main_thread(self):
start_time = time.time()
img_data_dict = self.spider_tag_url()
end_time = time.time()
print(f'抓取所有图片连接数据 {len(img_data_dict)} 总耗时 :>>>> {end_time - start_time}s')
task_list = []
for img_title, img_url in img_data_dict.items():
task = Thread(
target=self.download_img,
kwargs={'img_url': img_url, 'img_title': img_title}
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
if __name__ == '__main__':
spider = SpiderImg()
start_time = time.time()
# spider.main_process() # 下载所有图片总耗时 :>>>> 7.990673542022705s
spider.main_thread() # 下载所有图片总耗时 :>>>> 5.58322811126709s
end_time = time.time()
print(f'下载所有图片总耗时 :>>>> {end_time - start_time}s')
- 总结:
- 使用多线程更快