- 为什么需要并发?
在日常生活中,我们会同时做很多事情。比如边听音乐边做饭,或者一边看视频一边回复消息。这些行为让我们提高了效率。同样的,在编程中,我们也希望程序能同时处理多个任务,这样可以提高程序的执行效率。这就是并发编程的意义所在。
-
并发的基本概念
并行:同一时刻,多件事情同时发生。
并发:同一时间段内,多件事情交替发生。
同步:按顺序执行任务。
异步:不按顺序执行任务。 -
Python中的GIL(全局解释器锁)
Python有一个特性叫做GIL,它保证在同一时间只有一个线程在执行。这意味着在CPU密集型任务中,Python并不能充分利用多核CPU的优势。但是,在I/O密集型任务中,Python可以很好地利用并发。
- 多线程
多线程是实现并发的一种方式。Python中的threading模块提供了创建和管理线程的功能。
import threading
import time
def task():
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
if __name__ == "__main__":
thread1 = threading.Thread(target=task, name="Thread1")
thread2 = threading.Thread(target=task, name="Thread2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("All threads completed")
输出结果:
Thread Thread1 started
Thread Thread2 started
Thread Thread1 finished
Thread Thread2 finished
All threads completed
- 多进程
多进程是另一种实现并发的方式。Python中的multiprocessing模块提供了创建和管理进程的功能。
from multiprocessing import Process
import time
def task():
print(f"Process {multiprocessing.current_process().name} started")
time.sleep(2)
print(f"Process {multiprocessing.current_process().name} finished")
if __name__ == "__main__":
process1 = Process(target=task, name="Process1")
process2 = Process(target=task, name="Process2")
process1.start()
process2.start()
process1.join()
process2.join()
print("All processes completed")
输出结果:
Process Process1 started
Process Process2 started
Process Process1 finished
Process Process2 finished
All processes completed
- 异步IO
异步IO可以让程序在等待某些操作完成时继续执行其他任务。Python中的asyncio模块提供了异步编程的支持。
import asyncio
async def task(name):
print(f"Task {name} started")
await asyncio.sleep(2)
print(f"Task {name} finished")
async def main():
task1 = asyncio.create_task(task("Task1"))
task2 = asyncio.create_task(task("Task2"))
await task1
await task2
print("All tasks completed")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
- 协程
协程是一种特殊的函数,可以在执行过程中暂停并恢复执行。Python中的async/await语法提供了创建和管理协程的功能。
import asyncio
async def task(name):
print(f"Task {name} started")
await asyncio.sleep(2)
print(f"Task {name} finished")
async def main():
task1 = asyncio.create_task(task("Task1"))
task2 = asyncio.create_task(task("Task2"))
await task1
await task2
print("All tasks completed")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
- 线程池
线程池可以预先创建一定数量的线程,当有任务需要执行时,可以直接使用线程池中的线程,而不需要每次都创建新的线程。
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} finished")
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(task, "Task1")
executor.submit(task, "Task2")
print("All tasks completed")
输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
print("Start blocking IO operation...")
import time
time.sleep(3) # 模拟阻塞操作
return "Blocking IO result"
async def main():
loop = asyncio.get_running_loop()
# 在一个线程池中运行阻塞IO操作
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
asyncio.run(main())
- 进程池
进程池与线程池类似,只不过它管理的是进程而不是线程。
from concurrent.futures import ProcessPoolExecutor
import time
def task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} finished")
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=2) as executor:
executor.submit(task, "Task1")
executor.submit(task, "Task2")
print("All tasks completed")
输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
- 事件循环
事件循环是异步编程的核心。它负责调度任务,并在任务完成后继续执行其他任务。
import asyncio
async def task(name):
print(f"Task {name} started")
await asyncio.sleep(2)
print(f"Task {name} finished")
async def main():
task1 = asyncio.create_task(task("Task1"))
task2 = asyncio.create_task(task("Task2"))
await task1
await task2
print("All tasks completed")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
- 信号量
信号量用于限制同时执行的任务数量。这对于资源有限的情况非常有用。
import asyncio
from asyncio import Semaphore
async def task(name, semaphore: Semaphore):
async with semaphore:
print(f"Task {name} started")
await asyncio.sleep(2)
print(f"Task {name} finished")
async def main():
semaphore = Semaphore(value=2)
tasks = [asyncio.create_task(task(f"Task{i}", semaphore)) for i in range(3)]
await asyncio.gather(*tasks)
print("All tasks completed")
if __name__ == "__main__":
asyncio.run(main())
- 锁和条件变量
锁和条件变量是并发编程中常用的同步机制。锁用于保护共享资源,防止多个线程同时访问。条件变量则用于协调多个线程之间的执行顺序。
锁
锁可以防止多个线程同时修改共享数据。
import threading
import time
def increment(counter, lock):
with lock:
local_counter = counter.value
local_counter += 1
time.sleep(0.1) # 模拟耗时操作
counter.value = local_counter
print(f"Counter incremented to {counter.value}")
if __name__ == "__main__":
counter = threading.Value('i', 0)
lock = threading.Lock()
threads = []
for _ in range(10):
thread = threading.Thread(target=increment, args=(counter, lock))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"Final counter value: {counter.value}")
输出结果:
Counter incremented to 1
Counter incremented to 2
Counter incremented to 3
...
Counter incremented to 10
Final counter value: 10
条件变量
条件变量用于协调多个线程之间的执行顺序。
import threading
import time
class Counter:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
def increment(self):
with self._lock:
self.value += 1
self._condition.notify_all()
def wait_for_value(self, target_value):
with self._condition:
while self.value < target_value:
self._condition.wait()
print(f"Counter reached {target_value}")
def increment(counter):
for _ in range(10):
counter.increment()
def waiter(counter):
for i in range(1, 11):
counter.wait_for_value(i)
if __name__ == "__main__":
counter = Counter()
threads = []
thread1 = threading.Thread(target=increment, args=(counter,))
thread2 = threading.Thread(target=waiter, args=(counter,))
threads.append(thread1)
threads.append(thread2)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Final counter value: {counter.value}")
输出结果:
Counter reached 1
Counter reached 2
...
Counter reached 10
Final counter value: 10
- 生产者消费者模式
生产者消费者模式是一种经典的并发模式,用于解决生产者和消费者之间的数据交换问题。
生产者消费者模式
使用队列来协调生产者和消费者的执行。
import queue
import threading
import time
class Producer(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(10):
item = f"Item {i}"
self.queue.put(item)
print(f"Produced {item}")
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
item = self.queue.get()
if item is None:
break
print(f"Consumed {item}")
time.sleep(0.5)
self.queue.task_done()
if __name__ == "__main__":
q = queue.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer.start()
consumer.start()
producer.join()
q.join()
q.put(None)
consumer.join()
print("All items consumed")
输出结果:
Produced Item 0
Produced Item 1
Consumed Item 0
Produced Item 2
Consumed Item 1
Produced Item 3
Consumed Item 2
Produced Item 4
Consumed Item 3
Produced Item 5
Consumed Item 4
Produced Item 6
Consumed Item 5
Produced Item 7
Consumed Item 6
Produced Item 8
Consumed Item 7
Produced Item 9
Consumed Item 8
Consumed Item 9
All items consumed
- 队列
队列是并发编程中常用的数据结构,用于在生产者和消费者之间传递数据。
队列
使用queue.Queue来创建一个线程安全的队列。
import queue
import threading
import time
def producer(queue):
for i in range(10):
item = f"Item {i}"
queue.put(item)
print(f"Produced {item}")
time.sleep(1)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumed {item}")
time.sleep(0.5)
queue.task_done()
if __name__ == "__main__":
q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
q.join()
q.put(None)
consumer_thread.join()
print("All items consumed")
输出结果:
Produced Item 0
Produced Item 1
Consumed Item 0
Produced Item 2
Consumed Item 1
Produced Item 3
Consumed Item 2
Produced Item 4
Consumed Item 3
Produced Item 5
Consumed Item 4
Produced Item 6
Consumed Item 5
Produced Item 7
Consumed Item 6
Produced Item 8
Consumed Item 7
Produced Item 9
Consumed Item 8
Consumed Item 9
All items consumed
- 信号量
信号量用于限制同时执行的任务数量。这对于资源有限的情况非常有用。
信号量
使用asyncio.Semaphore来限制并发的数量。
import asyncio
from asyncio import Semaphore
async def task(name, semaphore: Semaphore):
async with semaphore:
print(f"Task {name} started")
await asyncio.sleep(2)
print(f"Task {name} finished")
async def main():
semaphore = Semaphore(value=2)
tasks = [asyncio.create_task(task(f"Task{i}", semaphore)) for i in range(3)]
await asyncio.gather(*tasks)
print("All tasks completed")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task0 started
Task Task1 started
Task Task0 finished
Task Task1 finished
Task Task2 started
Task Task2 finished
All tasks completed
- 事件
事件是另一种同步机制,用于通知其他线程或协程。
事件使用asyncio.Event来通知其他协程。
import asyncio
async def waiter(event):
print("Waiting for event...")
await event.wait()
print("Event received!")
async def main():
event = asyncio.Event()
waiter_task = asyncio.create_task(waiter(event))
await asyncio.sleep(3)
print("Setting event...")
event.set()
await waiter_task
print("All tasks completed")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Waiting for event...
Setting event...
Event received!
All tasks completed
- Future和Task
Future和Task是异步编程中的重要概念,用于表示异步操作的结果。
Future 使用asyncio.Future来表示异步操作的结果。
import asyncio
async def task():
print("Task started")
await asyncio.sleep(2)
print("Task finished")
return "Task result"
async def main():
future = asyncio.Future()
async def run_task():
result = await task()
future.set_result(result)
task_task = asyncio.create_task(run_task())
await task_task
print(f"Future result: {future.result()}")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task started
Task finished
Future result: Task result
Task
使用asyncio.create_task来创建一个任务。
import asyncio
async def task(name):
print(f"Task {name} started")
await asyncio.sleep(2)
print(f"Task {name} finished")
return f"Task {name} result"
async def main():
task1 = asyncio.create_task(task("Task1"))
task2 = asyncio.create_task(task("Task2"))
results = await asyncio.gather(task1, task2)
print(results)
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
['Task Task1 result', 'Task Task2 result']
- 高级并发技巧
除了基本的并发模式之外,还有一些高级技巧可以帮助我们更好地管理并发任务。
超时处理
使用asyncio.wait_for来设置超时。
import asyncio
async def task(name):
print(f"Task {name} started")
await asyncio.sleep(3)
print(f"Task {name} finished")
return f"Task {name} result"
async def main():
try:
result = await asyncio.wait_for(task("Task1"), timeout=2)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Task timed out")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task1 started
Task timed out
任务取消
使用asyncio.Task.cancel()来取消任务。
import asyncio
async def task(name):
print(f"Task {name} started")
await asyncio.sleep(3)
print(f"Task {name} finished")
return f"Task {name} result"
async def main():
task1 = asyncio.create_task(task("Task1"))
await asyncio.sleep(1)
task1.cancel()
try:
await task1
except asyncio.CancelledError:
print("Task was cancelled")
print("Main completed")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
Task Task1 started
Task was cancelled
Main completed
实战案例:并发爬虫
假设我们需要从多个网站上抓取数据,我们可以使用异步IO和并发来提高爬取速度。
import aiohttp
import asyncio
from pathlib import Path
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def save_data(data, filename):
with open(filename, "w") as file:
file.write(data)
async def main():
urls = [
"https://example.com/data1",
"https://example.com/data2",
"https://example.com/data3"
]
filenames = [f"data{i}.txt" for i in range(3)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
data = await asyncio.gather(*tasks)
await asyncio.gather(*[save_data(d, filename) for d, filename in zip(data, filenames)])
print("All data fetched and saved")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
All data fetched and saved