深入理解Python协程:asyncio、异步并发、事件循环

发布于:2025-06-14 ⋅ 阅读:(19) ⋅ 点赞:(0)

概念解析

异步编程是一种允许程序在等待 I/O 操作如网络请求、文件读写)时不被阻塞,转而执行其他任务的编程范式。

在 Python 中,其核心实现概念如下:

协程(Coroutines)

  • 定义:可暂停执行并在合适时机恢复的函数,通过async def关键字声明。
  • 特点:非抢占式调度,协程主动通过await让出执行权,适用于处理高并发 I/O 场景。

事件循环(Event Loop)

  • 定义:异步编程的 “调度器”,负责管理协程的执行顺序、监控 I/O 事件状态,并在事件就绪时恢复对应协程。
  • 核心逻辑:循环检查可执行的协程任务,按事件触发顺序调度执行。

任务(Tasks)

  • 定义:对协程的封装,代表一个独立的异步操作单元,通过asyncio.create_task()创建。
  • 功能:支持取消任务、等待任务完成(await task)或获取任务状态。

Future 对象

  • 定义:表示一个尚未完成的异步操作结果,本质是协程间传递状态的载体。
  • 作用:当协程需要等待某个异步操作的结果时,可通过await Future暂停执行,待操作完成后获取结果。

await 表达式

  • 定义:协程中用于暂停执行的关键字,用于等待另一个协程、Task 或 Future 对象的完成。
  • 示例:result = await async_function(),表示暂停当前协程,直到async_function执行完毕并返回结果。

应用场景

异步编程尤其适用于I/O 密集型任务如 HTTP 请求、数据库查询、文件操作等)。
通过减少线程切换开销和 CPU 闲置时间,显著提升程序的并发处理能力。
相比多线程编程,异步编程在高并发场景下更轻量、更高效。


asyncio库详解

Python 3.4 引入了 asyncio 库,作为异步编程的核心组件,事件循环是 asyncio 的核心。
Windows 上使用 ProactorEventLoop,在 Unix 上使用 SelectorEventLoop

Unix系统(SelectorEventLoop)

  • 基于selectors 模块对底层 I/O 多路复用机制(如select、poll、epoll、kqueue)的抽象。
  • 采用 “就绪通知” 机制:监视文件描述符(如套接字)状态,当 I/O 操作就绪时(如可读 / 可写)通知应用程序。
  • 优势场景1:擅长处理大量并发连接(如 Linuxepoll机制支持高效事件驱动)。
  • 优势场景2:适用于网络服务器、高并发 I/O 场景。

Windows系统(ProactorEventLoop)

  • 基于 Windows 专有 I/O 完成端口(IOCP)机制,属于系统级异步 I/O 框架。
  • 采用 “完成通知” 机制:异步启动 I/O 操作,操作完成后由系统主动通知程序。
  • 优势场景1:深度优化 Windows 平台特性,支持全类型异步 I/O 操作(含文件 I/O)。
  • 优势场景1:在 Windows 环境下性能通常优于 SelectorEventLoop

两种事件循环的关键差异

维度 SelectorEventLoop(Unix) ProactorEventLoop(Windows)
通知机制 等待 I/O 就绪后执行操作(“询问式”:Can I read/write? 异步启动 I/O 操作,完成后被动接收通知(“回调式”:Notify when done
API 覆盖范围 部分平台可能不支持全类型文件 I/O 操作 原生支持 Windows 所有异步 I/O 操作(如管道、套接字、文件)
性能特点 Unix 系统下,依赖epoll/kqueue等机制实现高效并发 Windows 下利用 IOCP 机制实现低延迟、高吞吐量

事件循环管理

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()

# 运行协程直到完成
loop.run_until_complete(my_coroutine())

# 运行事件循环直到stop()被调用
loop.run_forever()

# 关闭事件循环
loop.close()

协程定义与执行

async def fetch_data(url):
    print(f"开始获取数据: {url}")
    await asyncio.sleep(2)  # 模拟I/O操作
    print(f"数据获取完成: {url}")
    return f"来自 {url} 的数据"

# Python 3.7+ 推荐方式
async def main():
    result = await fetch_data("example.com")
    print(result)

asyncio.run(main())  # Python 3.7+引入,简化了事件循环管理

任务创建与管理

async def main():
    # 创建任务
    task1 = asyncio.create_task(fetch_data("site1.com"))
    task2 = asyncio.create_task(fetch_data("site2.com"))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(results)
    
    # 并发运行多个协程
    results = await asyncio.gather(
        fetch_data("site3.com"),
        fetch_data("site4.com")
    )

超时管理

async def main():
    try:
        # 设置超时
        result = await asyncio.wait_for(fetch_data("example.com"), timeout=1.0)
    except asyncio.TimeoutError:
        print("操作超时")

同步与异步代码结合

import concurrent.futures

def cpu_bound_task(x):
    # 计算密集型任务
    return x * x

async def main():
    # 使用线程池执行阻塞I/O
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_task, 10)
        print(result)

高并发场景实战案例

案例1: 并发网络请求

import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# 测试URLs
urls = [
    "https://www.google.com",
    "https://www.github.com",
    "https://www.python.org",
    # 可添加更多URL
] * 5  # 重复请求以增加数量

async def main():
    start = time.time()
    results = await fetch_all(urls)
    end = time.time()
    print(f"获取了 {len(results)} 个页面,耗时: {end - start:.2f} 秒")

# 运行
asyncio.run(main())

案例2: 异步数据库操作

使用asyncpg进行PostgreSQL异步操作

import asyncio
import asyncpg

async def create_tables(conn):
    await conn.execute('''
        CREATE TABLE IF NOT EXISTS users(
            id SERIAL PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    ''')

async def insert_users(conn, users):
    # 批量插入
    await conn.executemany(
        'INSERT INTO users(name, email) VALUES($1, $2)',
        users
    )

async def fetch_users(conn):
    return await conn.fetch('SELECT * FROM users')

async def main():
    # 连接数据库
    conn = await asyncpg.connect(
        user='postgres',
        password='password',
        database='testdb',
        host='127.0.0.1'
    )
    
    # 创建表
    await create_tables(conn)
    
    # 生成测试数据
    test_users = [
        ('User1', 'user1@example.com'),
        ('User2', 'user2@example.com'),
        ('User3', 'user3@example.com'),
    ]
    
    # 插入数据
    await insert_users(conn, test_users)
    
    # 查询数据
    users = await fetch_users(conn)
    for user in users:
        print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")
    
    # 关闭连接
    await conn.close()

# 运行
asyncio.run(main())

案例3: 异步Web爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def parse(html):
    # 使用BeautifulSoup解析HTML
    soup = BeautifulSoup(html, 'html.parser')
    # 获取所有链接
    links = [a.get('href') for a in soup.find_all('a') if a.get('href')]
    return links

async def crawl(url, max_depth=2):
    visited = set()
    
    async def _crawl(current_url, depth):
        if depth > max_depth or current_url in visited:
            return
        
        visited.add(current_url)
        print(f"正在爬取: {current_url}")
        
        try:
            async with aiohttp.ClientSession() as session:
                html = await fetch(session, current_url)
                links = await parse(html)
                
                # 过滤出同域名链接
                base_url = '/'.join(current_url.split('/')[:3])
                same_domain_links = [
                    link if link.startswith('http') else f"{base_url}{link}"
                    for link in links if link and (link.startswith('http') or link.startswith('/'))
                ]
                
                # 创建子任务继续爬取
                tasks = [
                    _crawl(link, depth + 1) 
                    for link in same_domain_links[:5]  # 限制每页最多爬5个链接
                ]
                await asyncio.gather(*tasks)
        except Exception as e:
            print(f"爬取 {current_url} 出错: {e}")
    
    await _crawl(url, 0)
    return visited

async def main():
    start = time.time()
    visited = await crawl("https://python.org", max_depth=1)
    end = time.time()
    print(f"爬取了 {len(visited)} 个页面,耗时: {end - start:.2f} 秒")

# 运行
asyncio.run(main())

案例4: 异步API服务器处理大量并发请求

使用FastAPI构建高并发API服务

from fastapi import FastAPI, BackgroundTasks
import asyncio
import uvicorn
import time
import random

app = FastAPI()

# 模拟数据库
db = {}

# 模拟异步数据库操作
async def db_operation(key, delay=None):
    if delay is None:
        delay = random.uniform(0.1, 0.5)  # 随机延迟模拟真实场景
    await asyncio.sleep(delay)
    return db.get(key)

# 模拟耗时任务
async def process_item(item_id):
    print(f"开始处理项目 {item_id}")
    await asyncio.sleep(5)  # 模拟耗时操作
    print(f"项目 {item_id} 处理完成")
    return {"item_id": item_id, "status": "processed"}

# 常规端点
@app.get("/items/{item_id}")
async def read_item(item_id: str):
    result = await db_operation(item_id)
    return {"item_id": item_id, "value": result}

# 批量操作端点
@app.get("/batch")
async def batch_operation(items: str):
    item_ids = items.split(",")
    tasks = [db_operation(item_id) for item_id in item_ids]
    results = await asyncio.gather(*tasks)
    return dict(zip(item_ids, results))

# 后台任务
@app.post("/items/{item_id}/process")
async def process(item_id: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_item, item_id)
    return {"message": f"Processing item {item_id} in the background"}

# 负载测试端点
@app.get("/load-test/{count}")
async def load_test(count: int):
    start = time.time()
    
    # 创建多个并发任务
    tasks = []
    for i in range(count):
        # 随机延迟
        delay = random.uniform(0.1, 0.5)
        tasks.append(asyncio.sleep(delay))
    
    # 并发执行所有任务
    await asyncio.gather(*tasks)
    
    end = time.time()
    return {
        "tasks_completed": count,
        "time_taken": f"{end - start:.2f} 秒",
        "tasks_per_second": f"{count / (end - start):.2f}"
    }

# 初始化一些测试数据
@app.on_event("startup")
async def startup_event():
    for i in range(1000):
        db[str(i)] = f"value-{i}"

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)