python并发编程,异步、线程、栈的那些事~

  1. 为什么需要并发?

在日常生活中,我们会同时做很多事情。比如边听音乐边做饭,或者一边看视频一边回复消息。这些行为让我们提高了效率。同样的,在编程中,我们也希望程序能同时处理多个任务,这样可以提高程序的执行效率。这就是并发编程的意义所在。

  1. 并发的基本概念

    并行:同一时刻,多件事情同时发生。
    并发:同一时间段内,多件事情交替发生。
    同步:按顺序执行任务。
    异步:不按顺序执行任务。

  2. Python中的GIL(全局解释器锁)

Python有一个特性叫做GIL,它保证在同一时间只有一个线程在执行。这意味着在CPU密集型任务中,Python并不能充分利用多核CPU的优势。但是,在I/O密集型任务中,Python可以很好地利用并发。

  1. 多线程
    多线程是实现并发的一种方式。Python中的threading模块提供了创建和管理线程的功能。
import threading
import time

def task():
    print(f"Thread {threading.current_thread().name} started")
    time.sleep(2)
    print(f"Thread {threading.current_thread().name} finished")

if __name__ == "__main__":
    thread1 = threading.Thread(target=task, name="Thread1")
    thread2 = threading.Thread(target=task, name="Thread2")

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print("All threads completed")

输出结果:
Thread Thread1 started
Thread Thread2 started
Thread Thread1 finished
Thread Thread2 finished
All threads completed
  1. 多进程

多进程是另一种实现并发的方式。Python中的multiprocessing模块提供了创建和管理进程的功能。

from multiprocessing import Process
import time

def task():
    print(f"Process {multiprocessing.current_process().name} started")
    time.sleep(2)
    print(f"Process {multiprocessing.current_process().name} finished")

if __name__ == "__main__":
    process1 = Process(target=task, name="Process1")
    process2 = Process(target=task, name="Process2")

    process1.start()
    process2.start()

    process1.join()
    process2.join()

    print("All processes completed")

输出结果:
Process Process1 started
Process Process2 started
Process Process1 finished
Process Process2 finished
All processes completed
  1. 异步IO

异步IO可以让程序在等待某些操作完成时继续执行其他任务。Python中的asyncio模块提供了异步编程的支持。

import asyncio

async def task(name):
    print(f"Task {name} started")
    await asyncio.sleep(2)
    print(f"Task {name} finished")

async def main():
    task1 = asyncio.create_task(task("Task1"))
    task2 = asyncio.create_task(task("Task2"))

    await task1
    await task2

    print("All tasks completed")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
  1. 协程

协程是一种特殊的函数,可以在执行过程中暂停并恢复执行。Python中的async/await语法提供了创建和管理协程的功能。

import asyncio

async def task(name):
    print(f"Task {name} started")
    await asyncio.sleep(2)
    print(f"Task {name} finished")

async def main():
    task1 = asyncio.create_task(task("Task1"))
    task2 = asyncio.create_task(task("Task2"))

    await task1
    await task2

    print("All tasks completed")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
  1. 线程池

线程池可以预先创建一定数量的线程,当有任务需要执行时,可以直接使用线程池中的线程,而不需要每次都创建新的线程。

from concurrent.futures import ThreadPoolExecutor
import time

def task(name):
    print(f"Task {name} started")
    time.sleep(2)
    print(f"Task {name} finished")

if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(task, "Task1")
        executor.submit(task, "Task2")

    print("All tasks completed")

输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    print("Start blocking IO operation...")
    import time
    time.sleep(3)  # 模拟阻塞操作
    return "Blocking IO result"

async def main():
    loop = asyncio.get_running_loop()
    # 在一个线程池中运行阻塞IO操作
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print(result)

asyncio.run(main())
  1. 进程池

进程池与线程池类似,只不过它管理的是进程而不是线程。

from concurrent.futures import ProcessPoolExecutor
import time

def task(name):
    print(f"Task {name} started")
    time.sleep(2)
    print(f"Task {name} finished")

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=2) as executor:
        executor.submit(task, "Task1")
        executor.submit(task, "Task2")

    print("All tasks completed")

输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
  1. 事件循环
    事件循环是异步编程的核心。它负责调度任务,并在任务完成后继续执行其他任务。
import asyncio

async def task(name):
    print(f"Task {name} started")
    await asyncio.sleep(2)
    print(f"Task {name} finished")

async def main():
    task1 = asyncio.create_task(task("Task1"))
    task2 = asyncio.create_task(task("Task2"))

    await task1
    await task2

    print("All tasks completed")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
All tasks completed
  1. 信号量
    信号量用于限制同时执行的任务数量。这对于资源有限的情况非常有用。
import asyncio
from asyncio import Semaphore

async def task(name, semaphore: Semaphore):
    async with semaphore:
        print(f"Task {name} started")
        await asyncio.sleep(2)
        print(f"Task {name} finished")

async def main():
    semaphore = Semaphore(value=2)

    tasks = [asyncio.create_task(task(f"Task{i}", semaphore)) for i in range(3)]

    await asyncio.gather(*tasks)

    print("All tasks completed")

if __name__ == "__main__":
    asyncio.run(main())
  1. 锁和条件变量
    锁和条件变量是并发编程中常用的同步机制。锁用于保护共享资源,防止多个线程同时访问。条件变量则用于协调多个线程之间的执行顺序。

    锁可以防止多个线程同时修改共享数据。
import threading
import time

def increment(counter, lock):
    with lock:
        local_counter = counter.value
        local_counter += 1
        time.sleep(0.1)  # 模拟耗时操作
        counter.value = local_counter
        print(f"Counter incremented to {counter.value}")

if __name__ == "__main__":
    counter = threading.Value('i', 0)
    lock = threading.Lock()
    threads = []

    for _ in range(10):
        thread = threading.Thread(target=increment, args=(counter, lock))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print(f"Final counter value: {counter.value}")

输出结果:
Counter incremented to 1
Counter incremented to 2
Counter incremented to 3
...
Counter incremented to 10
Final counter value: 10

条件变量

条件变量用于协调多个线程之间的执行顺序。

import threading
import time

class Counter:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()
        self._condition = threading.Condition(self._lock)

    def increment(self):
        with self._lock:
            self.value += 1
            self._condition.notify_all()

    def wait_for_value(self, target_value):
        with self._condition:
            while self.value < target_value:
                self._condition.wait()
            print(f"Counter reached {target_value}")

def increment(counter):
    for _ in range(10):
        counter.increment()

def waiter(counter):
    for i in range(1, 11):
        counter.wait_for_value(i)

if __name__ == "__main__":
    counter = Counter()
    threads = []

    thread1 = threading.Thread(target=increment, args=(counter,))
    thread2 = threading.Thread(target=waiter, args=(counter,))

    threads.append(thread1)
    threads.append(thread2)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print(f"Final counter value: {counter.value}")

输出结果:
Counter reached 1
Counter reached 2
...
Counter reached 10
Final counter value: 10
  1. 生产者消费者模式

生产者消费者模式是一种经典的并发模式,用于解决生产者和消费者之间的数据交换问题。
生产者消费者模式
使用队列来协调生产者和消费者的执行。

import queue
import threading
import time

class Producer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(10):
            item = f"Item {i}"
            self.queue.put(item)
            print(f"Produced {item}")
            time.sleep(1)

class Consumer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            if item is None:
                break
            print(f"Consumed {item}")
            time.sleep(0.5)
            self.queue.task_done()

if __name__ == "__main__":
    q = queue.Queue()
    producer = Producer(q)
    consumer = Consumer(q)

    producer.start()
    consumer.start()

    producer.join()
    q.join()

    q.put(None)
    consumer.join()

    print("All items consumed")

输出结果:
Produced Item 0
Produced Item 1
Consumed Item 0
Produced Item 2
Consumed Item 1
Produced Item 3
Consumed Item 2
Produced Item 4
Consumed Item 3
Produced Item 5
Consumed Item 4
Produced Item 6
Consumed Item 5
Produced Item 7
Consumed Item 6
Produced Item 8
Consumed Item 7
Produced Item 9
Consumed Item 8
Consumed Item 9
All items consumed
  1. 队列

队列是并发编程中常用的数据结构,用于在生产者和消费者之间传递数据。
队列
使用queue.Queue来创建一个线程安全的队列。

import queue
import threading
import time

def producer(queue):
    for i in range(10):
        item = f"Item {i}"
        queue.put(item)
        print(f"Produced {item}")
        time.sleep(1)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed {item}")
        time.sleep(0.5)
        queue.task_done()

if __name__ == "__main__":
    q = queue.Queue()
    producer_thread = threading.Thread(target=producer, args=(q,))
    consumer_thread = threading.Thread(target=consumer, args=(q,))

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    q.join()

    q.put(None)
    consumer_thread.join()

    print("All items consumed")

输出结果:
Produced Item 0
Produced Item 1
Consumed Item 0
Produced Item 2
Consumed Item 1
Produced Item 3
Consumed Item 2
Produced Item 4
Consumed Item 3
Produced Item 5
Consumed Item 4
Produced Item 6
Consumed Item 5
Produced Item 7
Consumed Item 6
Produced Item 8
Consumed Item 7
Produced Item 9
Consumed Item 8
Consumed Item 9
All items consumed
  1. 信号量

信号量用于限制同时执行的任务数量。这对于资源有限的情况非常有用。
信号量
使用asyncio.Semaphore来限制并发的数量。

import asyncio
from asyncio import Semaphore

async def task(name, semaphore: Semaphore):
    async with semaphore:
        print(f"Task {name} started")
        await asyncio.sleep(2)
        print(f"Task {name} finished")

async def main():
    semaphore = Semaphore(value=2)

    tasks = [asyncio.create_task(task(f"Task{i}", semaphore)) for i in range(3)]

    await asyncio.gather(*tasks)

    print("All tasks completed")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task0 started
Task Task1 started
Task Task0 finished
Task Task1 finished
Task Task2 started
Task Task2 finished
All tasks completed
  1. 事件

事件是另一种同步机制,用于通知其他线程或协程。
事件使用asyncio.Event来通知其他协程。

import asyncio

async def waiter(event):
    print("Waiting for event...")
    await event.wait()
    print("Event received!")

async def main():
    event = asyncio.Event()

    waiter_task = asyncio.create_task(waiter(event))

    await asyncio.sleep(3)
    print("Setting event...")
    event.set()

    await waiter_task

    print("All tasks completed")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Waiting for event...
Setting event...
Event received!
All tasks completed
  1. Future和Task

Future和Task是异步编程中的重要概念,用于表示异步操作的结果。
Future 使用asyncio.Future来表示异步操作的结果。

import asyncio

async def task():
    print("Task started")
    await asyncio.sleep(2)
    print("Task finished")
    return "Task result"

async def main():
    future = asyncio.Future()

    async def run_task():
        result = await task()
        future.set_result(result)

    task_task = asyncio.create_task(run_task())

    await task_task

    print(f"Future result: {future.result()}")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task started
Task finished
Future result: Task result

Task
使用asyncio.create_task来创建一个任务。

import asyncio

async def task(name):
    print(f"Task {name} started")
    await asyncio.sleep(2)
    print(f"Task {name} finished")
    return f"Task {name} result"

async def main():
    task1 = asyncio.create_task(task("Task1"))
    task2 = asyncio.create_task(task("Task2"))

    results = await asyncio.gather(task1, task2)

    print(results)

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task1 started
Task Task2 started
Task Task1 finished
Task Task2 finished
['Task Task1 result', 'Task Task2 result']
  1. 高级并发技巧

除了基本的并发模式之外,还有一些高级技巧可以帮助我们更好地管理并发任务。
超时处理
使用asyncio.wait_for来设置超时。

import asyncio

async def task(name):
    print(f"Task {name} started")
    await asyncio.sleep(3)
    print(f"Task {name} finished")
    return f"Task {name} result"

async def main():
    try:
        result = await asyncio.wait_for(task("Task1"), timeout=2)
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Task timed out")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task1 started
Task timed out

任务取消
使用asyncio.Task.cancel()来取消任务。

import asyncio

async def task(name):
    print(f"Task {name} started")
    await asyncio.sleep(3)
    print(f"Task {name} finished")
    return f"Task {name} result"

async def main():
    task1 = asyncio.create_task(task("Task1"))

    await asyncio.sleep(1)
    task1.cancel()

    try:
        await task1
    except asyncio.CancelledError:
        print("Task was cancelled")

    print("Main completed")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
Task Task1 started
Task was cancelled
Main completed

实战案例:并发爬虫
假设我们需要从多个网站上抓取数据,我们可以使用异步IO和并发来提高爬取速度。

import aiohttp
import asyncio
from pathlib import Path

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def save_data(data, filename):
    with open(filename, "w") as file:
        file.write(data)

async def main():
    urls = [
        "https://example.com/data1",
        "https://example.com/data2",
        "https://example.com/data3"
    ]
    filenames = [f"data{i}.txt" for i in range(3)]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        data = await asyncio.gather(*tasks)

    await asyncio.gather(*[save_data(d, filename) for d, filename in zip(data, filenames)])

    print("All data fetched and saved")

if __name__ == "__main__":
    asyncio.run(main())

输出结果:
All data fetched and saved
本文来自投稿,不代表美熙智能立场,如若转载,请注明:原作者名和出处https://www.icnma.com
Like (0)
meixi管理
Previous 02/05/2024 13:25
Next 10/10/2024 15:27

猜你想看