前言
在现代编程中,处理并发任务是提高程序性能的关键之一。Python 提供了 多线程(threading) 和 多进程(multiprocessing) 两种方式来实现并发编程。多线程适用于 I/O 密集型任务,而多进程则更适合 CPU 密集型任务。通过这两种技术,你可以高效地处理大规模数据、加速程序执行并优化资源利用。
在本篇详细教程中,我们将讨论如何使用 Python 的 threading
模块实现多线程,以及如何使用 multiprocessing
库实现多进程,帮助你掌握并发编程的基础和应用场景。
目录
多线程编程(threading)
- 什么是多线程?
- 使用 Thread() 创建线程
- start() 和 join():启动和等待线程
- 线程间共享数据
- 锁(Locks):避免线程冲突
- 示例:多线程处理 I/O 操作
多进程编程(multiprocessing)
- 什么是多进程?
- 使用 Process() 创建进程
- 进程池(Pool)管理多个进程
- map() 和 apply_async():并行任务执行
- 进程间通信(Queue 和 Pipe)
- 示例:多进程计算密集型任务
多线程编程(threading)
1. 什么是多线程?
多线程是指在一个进程中同时运行多个线程,每个线程执行不同的任务,共享进程的内存空间。多线程适用于I/O密集型任务,如文件读写、网络请求等,可以提高程序的并发性和响应速度。
特点:
- 共享内存空间,数据共享和通信方便。
- 线程间切换速度快,开销小。
- 受全局解释器锁(GIL)影响,CPU密集型任务并不能真正并行。
2. 使用 Thread() 创建线程
threading
模块提供了创建线程的类和方法。
import threading # 导入 threading 模块
def task():
print("这是一个线程任务")
# 创建线程对象,target参数指定线程要执行的函数
thread = threading.Thread(target=task)
# 启动线程
thread.start()
代码解析:
import threading
:导入线程模块。def task()
:定义线程要执行的函数。threading.Thread(target=task)
:创建线程对象。thread.start()
:启动线程,开始执行任务。
3. start() 和 join():启动和等待线程
start()
:启动线程,使其开始执行。join()
:阻塞主线程,等待子线程完成。
import threading
import time
def task(name):
print(f"线程 {name} 开始")
time.sleep(2)
print(f"线程 {name} 结束")
# 创建多个线程
threads = []
for i in range(3):
thread = threading.Thread(target=task, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("所有线程已完成")
代码解析:
args=(i,)
:传递参数给线程函数。threads.append(thread)
:将线程对象添加到列表中,便于管理。thread.join()
:主线程等待子线程完成。
4. 线程间共享数据
线程之间可以共享全局变量,但需要注意线程安全问题。
import threading
counter = 0 # 全局变量
def increment():
global counter
for _ in range(100000):
counter += 1
# 创建两个线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print(f"最终计数器的值为:{counter}")
注意:由于线程争用,最终计数器的值可能不等于预期的200000。这是因为多个线程同时修改共享数据,导致数据竞争。
5. 锁(Locks):避免线程冲突
使用锁机制可以确保同一时间只有一个线程访问共享数据,避免数据竞争。
import threading
counter = 0
lock = threading.Lock() # 创建锁对象
def increment():
global counter
for _ in range(100000):
lock.acquire() # 获取锁
counter += 1
lock.release() # 释放锁
# 创建两个线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print(f"最终计数器的值为:{counter}")
代码解析:
lock = threading.Lock()
:创建一个锁对象。lock.acquire()
:请求获取锁,若锁已被占用,则等待。lock.release()
:释放锁,供其他线程使用。
6. 示例:多线程处理 I/O 操作
import threading
import requests
urls = [
'https://www.python.org',
'https://www.google.com',
'https://www.github.com',
# 更多的URL
]
def fetch_content(url):
response = requests.get(url)
print(f"{url} 的响应码:{response.status_code}")
threads = []
for url in urls:
thread = threading.Thread(target=fetch_content, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("所有请求已完成")
代码解析:
requests.get(url)
:发送HTTP GET请求。threading.Thread(target=fetch_content, args=(url,))
:为每个URL创建一个线程。- 适用于I/O密集型任务,提高程序响应速度。
多进程编程(multiprocessing)
1. 什么是多进程?
多进程是指同时运行多个进程,每个进程都有独立的内存空间。多进程适用于CPU密集型任务,如计算、图像处理等,能有效利用多核CPU,提高程序性能。
特点:
- 进程间内存独立,数据不共享,安全性高。
- 能够真正实现并行,充分利用多核CPU。
- 进程创建和切换开销较大。
2. 使用 Process() 创建进程
import multiprocessing
def task(name):
print(f"进程 {name} 开始")
# 执行任务
print(f"进程 {name} 结束")
if __name__ == '__main__':
process = multiprocessing.Process(target=task, args=('A',))
process.start()
process.join()
print("主进程结束")
代码解析:
multiprocessing.Process(target=task, args=('A',))
:创建进程对象。process.start()
:启动进程。process.join()
:等待进程完成。- 注意:在Windows系统中,必须将进程创建代码放在
if __name__ == '__main__':
下。
3. 进程池(Pool)管理多个进程
import multiprocessing
import time
def task(name):
print(f"任务 {name} 开始")
time.sleep(2)
print(f"任务 {name} 结束")
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3) # 创建拥有3个进程的进程池
for i in range(5):
pool.apply_async(task, args=(i,))
pool.close() # 关闭进程池,不能再添加新任务
pool.join() # 等待所有任务完成
print("所有任务已完成")
代码解析:
multiprocessing.Pool(processes=3)
:创建进程池,最大进程数为3。pool.apply_async(task, args=(i,))
:异步提交任务到进程池。pool.close()
:关闭进程池,不再接受新任务。pool.join()
:主进程等待进程池中的任务完成。
4. map() 和 apply_async():并行任务执行
使用 map()
方法
import multiprocessing
def square(x):
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
results = pool.map(square, range(10))
print(results)
代码解析:
pool.map(square, range(10))
:将range(10)
中的每个元素传递给square
函数,并行计算。
使用 apply_async()
方法
import multiprocessing
def square(x):
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
results = []
for i in range(10):
result = pool.apply_async(square, args=(i,))
results.append(result)
pool.close()
pool.join()
print([r.get() for r in results])
代码解析:
pool.apply_async()
:异步地将任务提交到进程池。result.get()
:获取异步执行的结果。
5. 进程间通信(Queue 和 Pipe)
使用 Queue
import multiprocessing
def producer(queue):
for item in range(5):
queue.put(item)
print(f"生产者生产了:{item}")
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"消费者消费了:{item}")
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=producer, args=(queue,))
c = multiprocessing.Process(target=consumer, args=(queue,))
p.start()
c.start()
p.join()
queue.put(None) # 发送终止信号
c.join()
代码解析:
multiprocessing.Queue()
:创建进程间共享的队列。queue.put(item)
:将数据放入队列。queue.get()
:从队列中获取数据。
使用 Pipe
import multiprocessing
def sender(conn):
conn.send("你好,消费者")
conn.close()
def receiver(conn):
message = conn.recv()
print(f"接收到的信息:{message}")
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
s = multiprocessing.Process(target=sender, args=(parent_conn,))
r = multiprocessing.Process(target=receiver, args=(child_conn,))
s.start()
r.start()
s.join()
r.join()
代码解析:
multiprocessing.Pipe()
:创建一个管道,返回两个连接对象。conn.send()
:发送数据。conn.recv()
:接收数据。
6. 示例:多进程计算密集型任务
import multiprocessing
import math
import time
def compute(start, end):
result = 0
for i in range(start, end):
result += math.sqrt(i)
return result
if __name__ == '__main__':
start_time = time.time()
cpu_count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cpu_count)
total_numbers = 10000000
chunk_size = total_numbers // cpu_count
tasks = []
for i in range(cpu_count):
start = i * chunk_size
end = (i + 1) * chunk_size if i != cpu_count - 1 else total_numbers
tasks.append(pool.apply_async(compute, args=(start, end)))
pool.close()
pool.join()
total = sum([task.get() for task in tasks])
print(f"计算结果:{total}")
print(f"耗时:{time.time() - start_time} 秒")
代码解析:
cpu_count = multiprocessing.cpu_count()
:获取CPU核心数。- 将计算任务分割成多个子任务,分配给不同的进程执行。
pool.apply_async(compute, args=(start, end))
:异步提交计算任务。sum([task.get() for task in tasks])
:汇总各进程的计算结果。
结论
通过本教程,我们深入学习了Python中的多线程和多进程编程:
-
多线程编程(threading):
- 理解了什么是多线程,以及如何创建和管理线程。
- 学会了使用锁机制避免线程间的数据竞争。
- 了解了多线程在I/O密集型任务中的应用。
-
多进程编程(multiprocessing):
- 掌握了多进程的概念,能创建和管理进程。
- 学习了进程池的使用,提高了多进程编程的效率。
- 了解了进程间通信的方法,如队列和管道。
- 认识到多进程在CPU密集型任务中的优势。
通过合理地使用多线程和多进程,可以显著提升程序的性能和效率。希望本教程能帮助您深入理解并发编程的核心概念,并在实际项目中加以应用。
标签:__,start,Python,编程,threading,线程,进程,多线程,multiprocessing From: https://blog.csdn.net/tim654654/article/details/142266142