异步编程实战指南
目录
- 简介
- 同步与异步编程概述
- Python中的异步编程基础
- 使用
asyncio
进行异步I/O - 异步编程模式
- 异步编程在后端开发中的应用
- 实战项目示例
- 最佳实践
- 常见陷阱与避免方法
- 工具与库
- 总结与进一步学习
简介
异步编程 是一种编程范式,允许程序在等待某些操作完成(如I/O操作)时继续执行其他任务,从而提高程序的性能和响应性。特别是在后端开发中,异步编程可以显著提升服务器的并发处理能力,减少资源浪费。
本指南将通过理论讲解与实际示例相结合的方式,帮助您系统地学习和掌握Python中的异步编程,并在后端开发中灵活应用。
同步与异步编程概述
同步编程
在同步编程中,代码按顺序执行,每个任务必须等待前一个任务完成后才能开始。这种方式简单直观,但在处理I/O密集型任务时效率较低,因为程序会在等待I/O操作完成时闲置。
示例:
import time
def fetch_data():
print("开始获取数据...")
time.sleep(2) # 模拟I/O操作
print("数据获取完成。")
return "数据"
def process_data(data):
print(f"开始处理数据: {data}")
time.sleep(1) # 模拟计算操作
print("数据处理完成。")
def main():
data = fetch_data()
process_data(data)
if __name__ == "__main__":
main()
输出:
开始获取数据...
数据获取完成。
开始处理数据: 数据
数据处理完成。
异步编程
异步编程允许程序在等待某些操作完成时继续执行其他任务,避免了不必要的等待时间。通过事件循环管理多个任务,实现高效的并发处理。
异步编程的优点:
- 高并发:同时处理多个任务,提高资源利用率。
- 响应性强:减少等待时间,提高程序的响应速度。
- 适用于I/O密集型任务:如网络请求、文件读写等。
Python中的异步编程基础
Python通过asyncio
库提供了对异步编程的支持。以下是异步编程中的核心概念:
事件循环
事件循环是异步编程的核心,它负责调度和执行协程。当一个协程遇到I/O操作时,事件循环可以暂停该协程,去执行其他任务,待I/O操作完成后再恢复执行。
示例:
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
await say_hello()
# 运行事件循环
asyncio.run(main())
输出:
Hello
World
协程
协程是异步编程的基本单位,类似于线程,但更加轻量。协程由async def
定义,并使用await
关键字等待其他协程的完成。
示例:
import asyncio
async def greet(name):
print(f"Hello, {name}!")
await asyncio.sleep(1)
print(f"Goodbye, {name}!")
async def main():
await greet("Alice")
asyncio.run(main())
输出:
Hello, Alice!
Goodbye, Alice!
任务与未来对象
任务是事件循环中调度的协程,可以并发执行。未来对象(Future)代表尚未完成的操作的结果。
示例:
import asyncio
async def fetch_data():
await asyncio.sleep(2)
return "数据"
async def main():
task = asyncio.create_task(fetch_data())
print("等待数据...")
data = await task
print(f"获取到的数据: {data}")
asyncio.run(main())
输出:
等待数据...
获取到的数据: 数据
使用 asyncio
进行异步I/O
asyncio
是Python中用于编写并发代码的标准库。以下是asyncio
的一些常用功能和用法。
基本用法
定义协程函数并在事件循环中运行。
示例:
import asyncio
async def say(message, delay):
await asyncio.sleep(delay)
print(message)
async def main():
await say("Hello after 1 second", 1)
await say("Hello after 2 seconds", 2)
asyncio.run(main())
输出:
Hello after 1 second
Hello after 2 seconds
并发执行
使用asyncio.gather
并发执行多个协程。
示例:
import asyncio
async def say(message, delay):
await asyncio.sleep(delay)
print(message)
async def main():
await asyncio.gather(
say("Hello after 1 second", 1),
say("Hello after 2 seconds", 2),
say("Hello after 3 seconds", 3)
)
asyncio.run(main())
输出:
Hello after 1 second
Hello after 2 seconds
Hello after 3 seconds
异步上下文管理器
在异步环境中使用上下文管理器,如打开异步文件。
示例:
import asyncio
async def read_file():
async with aiofiles.open('example.txt', 'r') as f:
contents = await f.read()
print(contents)
async def main():
await read_file()
asyncio.run(main())
注意: 需要安装aiofiles
库来支持异步文件操作。
pip install aiofiles
示例:异步HTTP请求
使用aiohttp
库进行异步HTTP请求。
示例:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'https://www.example.com')
print(html)
asyncio.run(main())
解释:
- aiohttp.ClientSession:用于创建会话,管理连接池。
- async with session.get(url):发送异步GET请求,并等待响应。
异步编程模式
在实际开发中,常见的异步编程模式包括生产者-消费者、并发请求和处理阻塞代码等。
生产者-消费者
生产者-消费者模式用于处理数据流,其中生产者生成数据,消费者处理数据。
示例:
import asyncio
import random
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(random.uniform(0.1, 1.0))
item = f"item-{i}"
await queue.put(item)
print(f"生产者: 生产了 {item}")
await queue.put(None) # 发送结束信号
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(random.uniform(0.1, 1.0))
print(f"消费者: 消费了 {item}")
print("消费者: 结束")
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue, 5),
consumer(queue)
)
asyncio.run(main())
输出:
生产者: 生产了 item-0
消费者: 消费了 item-0
生产者: 生产了 item-1
消费者: 消费了 item-1
生产者: 生产了 item-2
消费者: 消费了 item-2
生产者: 生产了 item-3
消费者: 消费了 item-3
生产者: 生产了 item-4
消费者: 消费了 item-4
消费者: 结束
并发请求
同时处理多个I/O操作,提高效率。
示例:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://www.example.com',
'https://www.python.org',
'https://www.github.com'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"内容来自: {url}, 长度: {len(content)}")
asyncio.run(main())
输出:
内容来自: https://www.example.com, 长度: 1256
内容来自: https://www.python.org, 长度: 4885
内容来自: https://www.github.com, 长度: 10456
处理阻塞代码
将阻塞代码转移到线程或进程中,以避免阻塞事件循环。
示例:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
print("开始阻塞I/O操作...")
time.sleep(3)
print("阻塞I/O操作完成。")
return "结果"
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(f"获得的结果: {result}")
asyncio.run(main())
输出:
开始阻塞I/O操作...
阻塞I/O操作完成。
获得的结果: 结果
解释:
- run_in_executor:将阻塞函数转移到线程池执行,避免阻塞事件循环。
- ThreadPoolExecutor:创建线程池,管理线程资源。
异步编程在后端开发中的应用
异步编程在后端开发中有广泛的应用,特别是在需要高并发和实时处理的场景。以下是一些典型的应用场景及其实现方法。
Web框架
FastAPI和aiohttp是两个支持异步编程的Python Web框架。
示例:使用 FastAPI 构建异步API
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/async")
async def async_endpoint():
await asyncio.sleep(1)
return {"message": "这是一个异步端点"}
运行应用:
uvicorn main:app --reload
数据库访问
使用支持异步操作的ORM,如SQLAlchemy的异步支持或Tortoise ORM。
示例:使用SQLAlchemy异步访问数据库
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(name: str):
async with async_session() as session:
new_user = User(name=name)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
return new_user
async def get_users():
async with async_session() as session:
result = await session.execute("SELECT * FROM users")
users = result.fetchall()
return users
async def main():
await init_db()
await create_user("Alice")
users = await get_users()
for user in users:
print(user)
asyncio.run(main())
解释:
- create_async_engine:创建异步数据库引擎。
- AsyncSession:使用异步会话处理数据库操作。
- await session.commit():异步提交事务。
外部API调用
使用aiohttp
或httpx
进行异步HTTP请求,提高并发处理能力。
示例:使用 httpx
进行异步HTTP请求
import asyncio
import httpx
async def fetch(url):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.text
async def main():
urls = [
'https://www.example.com',
'https://www.python.org',
'https://www.github.com'
]
tasks = [fetch(url) for url in urls]
responses = await asyncio.gather(*tasks)
for url, content in zip(urls, responses):
print(f"内容来自: {url}, 长度: {len(content)}")
asyncio.run(main())
输出:
内容来自: https://www.example.com, 长度: 1256
内容来自: https://www.python.org, 长度: 4885
内容来自: https://www.github.com, 长度: 10456
实时通信
使用WebSocket实现实时双向通信,如聊天应用、实时数据更新等。
示例:使用 FastAPI 构建WebSocket端点
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("WebSocket断开连接")
运行应用:
uvicorn main:app --reload
测试:
可以使用WebSocket客户端工具(如 websocat、Postman 或浏览器扩展)连接到 ws://localhost:8000/ws/chat
,发送消息并接收回显。
后台任务
使用异步任务队列处理耗时任务,如发送邮件、数据处理等。
示例:使用FastAPI的后台任务
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as log_file:
log_file.write(message + "\n")
@app.post("/send-notification/")
async def send_notification(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, message)
return {"message": "通知已发送"}
解释:
- BackgroundTasks:允许将任务添加到后台任务队列中,异步执行而不阻塞主线程。
运行应用:
uvicorn main:app --reload
实战项目示例
通过一个简单的异步Web服务器和WebSocket聊天室应用,来演示异步编程在后端开发中的实际应用。
1. 简单的异步Web服务器
构建一个使用FastAPI的异步Web服务器,支持基本的CRUD操作。
项目结构:
async_web_server/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ ├── schemas.py
│ └── crud.py
├── requirements.txt
└── README.md
文件内容:
requirements.txt
fastapi uvicorn[standard] sqlalchemy databases pydantic
app/models.py
from sqlalchemy import Column, Integer, String from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) description = Column(String, index=True)
app/schemas.py
from pydantic import BaseModel class ItemBase(BaseModel): name: str description: str class ItemCreate(ItemBase): pass class Item(ItemBase): id: int class Config: orm_mode = True
app/crud.py
from sqlalchemy.orm import Session from . import models, schemas def get_item(db: Session, item_id: int): return db.query(models.Item).filter(models.Item.id == item_id).first() def get_items(db: Session, skip: int = 0, limit: int = 10): return db.query(models.Item).offset(skip).limit(limit).all() def create_item(db: Session, item: schemas.ItemCreate): db_item = models.Item(name=item.name, description=item.description) db.add(db_item) db.commit() db.refresh(db_item) return db_item
app/main.py
from fastapi import FastAPI, Depends, HTTPException from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from . import models, schemas, crud DATABASE_URL = "sqlite:///./test.db" engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) models.Base.metadata.create_all(bind=engine) app = FastAPI() def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/items/", response_model=schemas.Item) async def create_item(item: schemas.ItemCreate, db: Session = Depends(get_db)): return crud.create_item(db=db, item=item) @app.get("/items/{item_id}", response_model=schemas.Item) async def read_item(item_id: int, db: Session = Depends(get_db)): db_item = crud.get_item(db, item_id=item_id) if db_item is None: raise HTTPException(status_code=404, detail="Item not found") return db_item @app.get("/items/", response_model=list[schemas.Item]) async def read_items(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)): items = crud.get_items(db, skip=skip, limit=limit) return items
运行应用:
uvicorn app.main:app --reload
测试API:
创建项目:发送POST请求到
http://127.0.0.1:8000/items/
,Body:
{ "name": "Item1", "description": "This is item 1." }
获取项目:发送GET请求到
http://127.0.0.1:8000/items/1
获取所有项目:发送GET请求到
http://127.0.0.1:8000/items/
2. WebSocket聊天室应用
构建一个简单的WebSocket聊天室,允许多个客户端实时发送和接收消息。
项目结构:
websocket_chat/
├── app/
│ ├── __init__.py
│ ├── main.py
│ └── manager.py
├── requirements.txt
└── README.md
文件内容:
requirements.txt
fastapi uvicorn[standard]
app/manager.py
from typing import List from fastapi import WebSocket class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message)
app/main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from .manager import ConnectionManager app = FastAPI() manager = ConnectionManager() @app.websocket("/ws/chat") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.broadcast(f"客户端说: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast("一个客户端已离开聊天室。")
运行应用:
uvicorn app.main:app --reload
测试聊天室:
可以使用WebSocket客户端工具(如 websocat、Postman 或浏览器扩展)连接到 ws://localhost:8000/ws/chat
,发送消息并接收广播消息。
最佳实践
1. 避免阻塞调用
在异步代码中,任何阻塞操作(如CPU密集型任务或阻塞I/O)都会阻塞整个事件循环,导致性能下降。应尽量使用异步库或将阻塞操作转移到线程或进程中。
示例:将阻塞操作转移到线程池
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_task():
print("开始阻塞任务...")
time.sleep(3)
print("阻塞任务完成。")
return "结果"
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_task)
print(f"获得结果: {result}")
asyncio.run(main())
2. 正确处理异常
在异步代码中,异常处理同样重要。使用try-except
块捕捉并处理异常,避免未处理的异常导致程序崩溃。
示例:
import asyncio
async def faulty_coroutine():
await asyncio.sleep(1)
raise ValueError("这是一个错误")
async def main():
try:
await faulty_coroutine()
except ValueError as e:
print(f"捕捉到异常: {e}")
asyncio.run(main())
输出:
捕捉到异常: 这是一个错误
3. 资源管理
确保正确管理资源,如数据库连接、文件句柄等。使用异步上下文管理器(async with
)来自动处理资源的打开和关闭。
示例:
import aiofiles
import asyncio
async def read_file(file_path):
async with aiofiles.open(file_path, 'r') as f:
contents = await f.read()
print(contents)
async def main():
await read_file('example.txt')
asyncio.run(main())
4. 测试异步代码
使用适当的测试工具(如pytest-asyncio
)测试异步代码,确保其正确性和稳定性。
示例:
# tests/test_async.py
import pytest
import asyncio
@pytest.mark.asyncio
async def test_async_function():
async def sample_coroutine():
await asyncio.sleep(1)
return "成功"
result = await sample_coroutine()
assert result == "成功"
运行测试:
pytest tests/
5. 使用异步库
尽量使用支持异步的第三方库,如aiohttp
、httpx
、aioredis
等,避免在异步环境中使用阻塞库。
示例:使用 httpx
进行异步HTTP请求
import asyncio
import httpx
async def fetch(url):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.text
async def main():
content = await fetch("https://www.example.com")
print(content)
asyncio.run(main())
常见陷阱与避免方法
1. 未使用 await
调用协程
在调用协程函数时,必须使用await
,否则协程不会执行。
错误示例:
import asyncio
async def greet():
await asyncio.sleep(1)
print("Hello")
async def main():
greet() # 忽略await
asyncio.run(main())
输出:
正确示例:
async def main():
await greet()
2. 混用同步和异步代码
在异步环境中调用同步阻塞代码会阻塞事件循环,影响性能。应尽量保持代码的异步性。
错误示例:
import asyncio
import time
async def main():
time.sleep(2) # 阻塞事件循环
print("完成")
asyncio.run(main())
正确示例:
async def main():
await asyncio.sleep(2)
print("完成")
3. 忘记关闭异步资源
未正确关闭异步资源(如数据库连接、文件)可能导致资源泄漏。使用异步上下文管理器或在适当的位置关闭资源。
示例:
import aiofiles
import asyncio
async def read_file(file_path):
f = await aiofiles.open(file_path, 'r')
contents = await f.read()
await f.close()
print(contents)
更好的方法:
async def read_file(file_path):
async with aiofiles.open(file_path, 'r') as f:
contents = await f.read()
print(contents)
工具与库
1. asyncio
Python标准库,提供了对异步编程的支持,包括事件循环、协程、任务管理等。
- 文档:https://docs.python.org/3/library/asyncio.html
2. aiohttp
用于构建异步HTTP客户端和服务器的库。
- 文档:https://docs.aiohttp.org/en/stable/
3. httpx
一个支持同步和异步的HTTP客户端库,类似于requests
。
- 文档:https://www.python-httpx.org/
4. aiofiles
用于异步文件操作的库。
- 文档:https://github.com/Tinche/aiofiles
5. pytest-asyncio
为pytest
提供异步测试支持的插件。
- 文档:https://github.com/pytest-dev/pytest-asyncio
6. 异步ORM
如SQLAlchemy
的异步支持、Tortoise ORM
等。
- SQLAlchemy 异步文档:https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html
- Tortoise ORM 文档:https://tortoise-orm.readthedocs.io/en/latest/
总结与进一步学习
通过本指南,您已经掌握了Python中异步编程的核心概念、基本用法以及在后端开发中的实际应用。以下是学习路径的建议,帮助您进一步提升异步编程技能:
- 深入理解
asyncio
的内部机制- 学习事件循环的工作原理。
- 理解协程的调度与执行。
- 探索高级功能,如子进程管理、信号处理等。
- 实践复杂的异步应用
- 构建包含多个异步组件的复杂项目。
- 实现高并发、高性能的后端服务。
- 掌握异步数据库操作
- 使用异步ORM进行高效的数据库交互。
- 优化数据库查询和事务管理。
- 探索异步微服务架构
- 使用异步编程构建分布式系统。
- 实现服务间的高效通信与协调。
- 性能优化与调试
- 学习异步代码的性能分析与优化方法。
- 使用调试工具和技术排查异步代码中的问题。
- 学习异步安全编程
- 掌握异步环境中的安全性考虑,如认证、授权、数据加密等。
- 实现安全、可靠的异步API服务。
附加资源
- FastAPI 官方文档:https://fastapi.tiangolo.com/
asyncio
官方文档:https://docs.python.org/3/library/asyncio.htmlaiohttp
官方文档:https://docs.aiohttp.org/en/stable/httpx
官方文档:https://www.python-httpx.org/aiofiles
GitHub:https://github.com/Tinche/aiofilespytest-asyncio
GitHub:https://github.com/pytest-dev/pytest-asyncio- SQLAlchemy 异步文档:https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html
- Tortoise ORM 文档:https://tortoise-orm.readthedocs.io/en/latest/
- Python 官方文档 - 异步编程:https://docs.python.org/3/library/asyncio.html
上下文管理器
什么是上下文管理器?
上下文管理器是一种对象,它定义了在特定代码块(通常由with
语句包围)开始和结束时要执行的操作。主要用于管理资源,如文件、网络连接、数据库会话等,确保在使用完毕后资源能够被正确释放,避免资源泄漏或其他潜在问题。
关键概念
with
语句:用于包装执行代码块,确保在代码块执行前和执行后执行特定的操作。__enter__
方法:在进入with
块时调用,通常用于初始化资源。__exit__
方法:在退出with
块时调用,无论代码块是否抛出异常,通常用于清理资源。
同步上下文管理器
基本用法
最常见的上下文管理器是用于文件操作的open
函数。使用with
语句可以确保文件在使用后自动关闭。
示例:使用with
打开和读取文件
# 同步上下文管理器示例
with open('example.txt', 'r') as file:
contents = file.read()
print(contents)
# 文件在此自动关闭,无需显式调用 file.close()
解释:
open('example.txt', 'r')
返回一个文件对象,该对象实现了上下文管理器协议。with
语句进入上下文,调用文件对象的__enter__
方法,返回文件对象本身并赋值给变量file
。- 执行
with
块中的代码,读取文件内容并打印。 - 无论
with
块内的代码是否抛出异常,都会调用文件对象的__exit__
方法,确保文件被关闭。
自定义上下文管理器
您可以通过定义__enter__
和__exit__
方法来创建自己的上下文管理器。
示例:创建一个简单的上下文管理器
class SimpleContextManager:
def __enter__(self):
print("进入上下文")
return self # 可以返回需要使用的对象
def __exit__(self, exc_type, exc_val, exc_tb):
print("退出上下文")
if exc_type:
print(f"发生异常: {exc_val}")
# 可以选择是否处理异常,返回True表示异常被处理,不再传播
# 使用自定义上下文管理器
with SimpleContextManager() as scm:
print("在上下文中执行代码")
# 触发异常
# raise ValueError("示例异常")
输出:
进入上下文
在上下文中执行代码
退出上下文
如果取消注释raise ValueError("示例异常")
,输出将包括异常信息:
进入上下文
在上下文中执行代码
发生异常: 示例异常
退出上下文
Traceback (most recent call last):
File "example.py", line 12, in <module>
raise ValueError("示例异常")
ValueError: 示例异常
使用contextlib
模块
Python的contextlib
模块提供了更简洁的方式来创建上下文管理器,尤其适用于简单的场景。
示例:使用contextlib.contextmanager
装饰器
from contextlib import contextmanager
@contextmanager
def my_context():
print("进入上下文")
try:
yield
print("正常退出上下文")
except Exception as e:
print(f"异常退出上下文: {e}")
# 可以选择是否重新抛出异常
finally:
print("清理操作")
# 使用自定义上下文管理器
with my_context():
print("在上下文中执行代码")
# 触发异常
# raise ValueError("示例异常")
输出:
进入上下文
在上下文中执行代码
正常退出上下文
清理操作
若触发异常:
进入上下文
在上下文中执行代码
异常退出上下文: 示例异常
清理操作
Traceback (most recent call last):
File "example.py", line 11, in <module>
raise ValueError("示例异常")
ValueError: 示例异常
异步上下文管理器
在异步编程中,使用异步上下文管理器可以处理异步资源,如异步文件操作、数据库连接等。使用async with
语句和异步上下文管理器协议。
基本用法
示例:使用异步上下文管理器打开异步文件
import aiofiles
import asyncio
async def read_file():
async with aiofiles.open('example.txt', 'r') as file:
contents = await file.read()
print(contents)
asyncio.run(read_file())
解释:
aiofiles.open
返回一个异步文件对象,实现了异步上下文管理器协议。async with
语句进入异步上下文,调用文件对象的__aenter__
方法,返回文件对象本身并赋值给变量file
。- 执行
async with
块中的代码,异步读取文件内容并打印。 - 无论
async with
块内的代码是否抛出异常,都会调用文件对象的__aexit__
方法,确保文件被关闭。
自定义异步上下文管理器
您可以通过定义__aenter__
和__aexit__
方法来创建自己的异步上下文管理器。
示例:创建一个简单的异步上下文管理器
class AsyncContextManager:
async def __aenter__(self):
print("异步进入上下文")
return self # 可以返回需要使用的对象
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("异步退出上下文")
if exc_type:
print(f"发生异常: {exc_val}")
# 可以选择是否处理异常,返回True表示异常被处理,不再传播
# 使用自定义异步上下文管理器
async def main():
async with AsyncContextManager() as acm:
print("在异步上下文中执行代码")
# 触发异常
# raise ValueError("示例异步异常")
asyncio.run(main())
输出:
异步进入上下文
在异步上下文中执行代码
异步退出上下文
若触发异常:
异步进入上下文
在异步上下文中执行代码
发生异常: 示例异步异常
异步退出上下文
Traceback (most recent call last):
File "example.py", line 17, in main
raise ValueError("示例异步异常")
ValueError: 示例异步异常
使用contextlib
的asynccontextmanager
装饰器
contextlib
模块同样支持异步上下文管理器,提供了asynccontextmanager
装饰器。
示例:使用contextlib.asynccontextmanager
装饰器
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def my_async_context():
print("异步进入上下文")
try:
yield
print("正常异步退出上下文")
except Exception as e:
print(f"异常异步退出上下文: {e}")
finally:
print("异步清理操作")
# 使用自定义异步上下文管理器
async def main():
async with my_async_context():
print("在异步上下文中执行代码")
# 触发异常
# raise ValueError("示例异步异常")
asyncio.run(main())
输出:
异步进入上下文
在异步上下文中执行代码
正常异步退出上下文
异步清理操作
若触发异常:
异步进入上下文
在异步上下文中执行代码
异常异步退出上下文: 示例异步异常
异步清理操作
Traceback (most recent call last):
File "example.py", line 14, in main
raise ValueError("示例异步异常")
ValueError: 示例异步异常
上下文管理器在后端开发中的应用
上下文管理器在后端开发中有广泛的应用,尤其是在资源管理和确保操作的原子性方面。以下是一些典型的应用场景:
1. 数据库连接管理
使用上下文管理器管理数据库会话,确保在操作完成后会话被正确关闭。
示例:使用异步SQLAlchemy管理数据库会话
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from contextlib import asynccontextmanager
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
@asynccontextmanager
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
# 使用上下文管理器获取数据库会话
async def get_user(user_id: int):
async with get_db() as db:
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
return user
2. 文件操作
确保文件在读写后被正确关闭,避免文件句柄泄漏。
示例:异步文件写入
import aiofiles
import asyncio
async def write_file(file_path, content):
async with aiofiles.open(file_path, 'w') as f:
await f.write(content)
asyncio.run(write_file('example.txt', 'Hello, Async World!'))
3. 资源锁
在并发环境中使用上下文管理器管理资源锁,确保线程安全。
示例:使用异步锁
import asyncio
lock = asyncio.Lock()
async def critical_section(name):
async with lock:
print(f"{name} 获取锁")
await asyncio.sleep(1)
print(f"{name} 释放锁")
async def main():
await asyncio.gather(
critical_section("任务1"),
critical_section("任务2"),
critical_section("任务3"),
)
asyncio.run(main())
输出:
任务1 获取锁
任务1 释放锁
任务2 获取锁
任务2 释放锁
任务3 获取锁
任务3 释放锁
4. 日志记录
使用上下文管理器确保日志文件在写入后被正确关闭。
示例:异步日志记录
import aiofiles
import asyncio
async def log_message(file_path, message):
async with aiofiles.open(file_path, 'a') as f:
await f.write(message + '\n')
async def main():
await log_message('log.txt', '这是一条日志消息。')
asyncio.run(main())
总结
上下文管理器是Python中管理资源的关键工具,能够确保在资源使用后进行正确的清理操作,避免资源泄漏和其他潜在问题。在后端开发中,无论是同步编程还是异步编程,上下文管理器都在确保代码的健壮性和可维护性方面发挥着重要作用。
关键要点回顾
- 上下文管理器协议:
- 同步上下文管理器需要实现
__enter__
和__exit__
方法。 - 异步上下文管理器需要实现
__aenter__
和__aexit__
方法。
- 同步上下文管理器需要实现
- 使用
with
和async with
:with
用于同步上下文管理器。async with
用于异步上下文管理器。
- 创建自定义上下文管理器:
- 通过定义类的方法或使用
contextlib
模块的装饰器。
- 通过定义类的方法或使用
- 实际应用场景:
- 文件操作、数据库连接管理、资源锁、日志记录等。
- 最佳实践:
- 始终使用上下文管理器来管理资源。
- 尽量使用异步库支持的异步上下文管理器,避免阻塞事件循环。
- 正确处理异常,确保资源在异常情况下也能被释放。
ORM
ORM(对象关系映射,Object-Relational Mapping) 是一种编程技术,用于在面向对象的编程语言(如Python)和关系型数据库(如MySQL、PostgreSQL、SQLite等)之间进行数据转换和交互。ORM通过将数据库中的表映射为编程语言中的类,将表中的记录映射为类的实例,使开发者能够以面向对象的方式操作数据库,而无需编写复杂的SQL语句。
目录
- 什么是ORM?
- ORM的工作原理
- ORM的优缺点
- Python中的常用ORM库
- 使用SQLAlchemy的基本示例
- ORM在FastAPI中的应用
- 何时使用ORM,何时使用原生SQL?
- 总结
什么是ORM?
ORM(对象关系映射) 是一种编程技术,通过将数据库中的表和记录与编程语言中的类和对象进行映射,使开发者能够使用面向对象的方式来操作数据库。ORM隐藏了底层的数据库交互细节,允许开发者专注于业务逻辑而无需编写大量的SQL语句。
核心概念
- 模型(Model):对应数据库中的表,每个模型类代表一个表。
- 实例(Instance):对应表中的一条记录,每个实例代表一行数据。
- 字段(Field):对应表中的列,每个类属性代表一个列。
- 会话(Session):用于管理和执行数据库操作的上下文。
示意图
关系型数据库 Python对象
+-----------+ +-----------+
| Users | | User |
+-----------+ +-----------+
| id (PK) |<---------> | id |
| name | | name |
| email | | email |
+-----------+ +-----------+
ORM的工作原理
ORM通过映射机制,将数据库中的表转换为编程语言中的类,将表中的记录转换为类的实例。这样,开发者可以使用类的方法和属性来进行数据库操作,而不需要直接编写SQL语句。
步骤
- 定义模型类:创建与数据库表对应的类,定义类的属性对应表的字段。
- 创建数据库连接:配置数据库连接信息。
- 创建表:根据模型类在数据库中创建对应的表结构。
- 执行操作:使用ORM提供的方法进行CRUD(创建、读取、更新、删除)操作。
示例流程
- 定义一个
User
模型类。 - 创建数据库连接。
- 使用ORM创建
users
表。 - 使用
User
类创建、查询、更新和删除用户记录。
ORM的优缺点
优点
- 简化开发:减少了编写SQL语句的需求,使用面向对象的方式操作数据库。
- 提高生产力:通过模型类的定义和操作,开发速度更快。
- 跨数据库支持:大多数ORM支持多种数据库,方便切换数据库类型。
- 安全性:ORM自动处理SQL注入等安全问题,提供更安全的数据库操作方式。
- 维护性强:通过模型类的定义,代码更具可读性和可维护性。
缺点
- 性能开销:ORM的抽象层会带来一定的性能开销,尤其在复杂查询时。
- 学习曲线:需要学习ORM的使用方法和配置,初学者可能有一定难度。
- 灵活性限制:对于一些复杂的、特定的SQL查询,ORM可能不够灵活,需要手动编写SQL。
- 调试困难:错误信息可能不如直接的SQL错误直观,调试时可能较为复杂。
Python中的常用ORM库
SQLAlchemy
SQLAlchemy是Python中最流行和功能强大的ORM库,支持多种数据库,并提供了灵活的查询构建器。它分为两个主要部分:
- 核心(Core):提供底层的数据库抽象和SQL构建功能。
- ORM(Object Relational Mapper):在核心基础上构建,提供面向对象的数据库操作接口。
特点:
- 强大的查询构建器
- 支持多种数据库
- 灵活且可扩展
Django ORM
Django是一个高级的Python Web框架,其内置的ORM提供了简洁而强大的数据库操作接口。适用于Django项目,但也可以在其他项目中单独使用。
特点:
- 与Django框架紧密集成
- 简洁易用
- 自动生成数据库迁移
Tortoise ORM
Tortoise ORM是一个轻量级的、异步的ORM库,专为异步编程设计,适用于如FastAPI等异步Web框架。
特点:
- 支持异步操作
- 简洁易用
- 类似Django ORM的设计
使用SQLAlchemy的基本示例
以下将通过一个简单的示例,演示如何使用SQLAlchemy进行基本的数据库操作。
安装SQLAlchemy
首先,确保已安装SQLAlchemy。如果使用异步操作,还需要安装asyncpg
或其他数据库驱动。
pip install sqlalchemy
如果需要使用SQLite(不需要额外驱动):
pip install sqlalchemy
定义模型
创建一个User
模型类,对应数据库中的users
表。
# models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users' # 数据库中的表名
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
def __repr__(self):
return f"<User(name={self.name}, email={self.email})>"
创建数据库连接
配置数据库连接并创建引擎。
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base
DATABASE_URL = "sqlite:///./test.db" # SQLite数据库,其他数据库如PostgreSQL等格式不同
# 创建引擎
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False} # SQLite特有参数
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建数据库表
Base.metadata.create_all(bind=engine)
执行CRUD操作
使用会话(Session)进行数据库操作。
# main.py
from sqlalchemy.orm import Session
from models import User
from database import SessionLocal
def create_user(db: Session, name: str, email: str):
db_user = User(name=name, email=email)
db.add(db_user)
db.commit()
db.refresh(db_user) # 更新对象,使其包含数据库生成的ID
return db_user
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def update_user(db: Session, user_id: int, name: str = None, email: str = None):
user = db.query(User).filter(User.id == user_id).first()
if user:
if name:
user.name = name
if email:
user.email = email
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int):
user = db.query(User).filter(User.id == user_id).first()
if user:
db.delete(user)
db.commit()
return user
# 示例用法
if __name__ == "__main__":
db = SessionLocal()
# 创建用户
user = create_user(db, name="Alice", email="alice@example.com")
print(f"创建用户: {user}")
# 获取用户
retrieved_user = get_user(db, user_id=user.id)
print(f"获取用户: {retrieved_user}")
# 更新用户
updated_user = update_user(db, user_id=user.id, name="Alice Smith")
print(f"更新用户: {updated_user}")
# 获取所有用户
users = get_users(db)
print(f"所有用户: {users}")
# 删除用户
delete_user(db, user_id=user.id)
print(f"删除用户: {user.id}")
db.close()
运行示例
确保所有文件(models.py
、database.py
、main.py
)在同一目录下,然后运行main.py
:
python main.py
输出示例:
创建用户: <User(name=Alice, email=alice@example.com)>
获取用户: <User(name=Alice, email=alice@example.com)>
更新用户: <User(name=Alice Smith, email=alice@example.com)>
所有用户: [<User(name=Alice Smith, email=alice@example.com)>]
删除用户: 1
ORM在FastAPI中的应用
在FastAPI中,ORM(如SQLAlchemy)常用于数据库交互。以下是如何在FastAPI项目中集成SQLAlchemy的基本步骤。
项目结构
fastapi-sqlalchemy/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ ├── schemas.py
│ ├── database.py
│ └── crud.py
├── requirements.txt
└── README.md
1. 安装依赖
pip install fastapi sqlalchemy uvicorn
2. 定义模型
# app/models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
3. 配置数据库
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.models import Base
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False} # SQLite特有参数
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建数据库表
Base.metadata.create_all(bind=engine)
4. 定义Pydantic模式
# app/schemas.py
from pydantic import BaseModel
class UserBase(BaseModel):
name: str
email: str
class UserCreate(UserBase):
pass
class UserResponse(UserBase):
id: int
class Config:
orm_mode = True
5. 定义CRUD操作
# app/crud.py
from sqlalchemy.orm import Session
from app.models import User
from app.schemas import UserCreate
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
6. 创建FastAPI应用并集成ORM
# app/main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from app import models, schemas, crud
from app.database import SessionLocal, engine
from typing import List
# 创建数据库表
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖项,获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.get("/users/", response_model=List[schemas.UserResponse])
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
7. 运行应用
使用Uvicorn运行FastAPI应用:
uvicorn app.main:app --reload
访问 http://127.0.0.1:8000/docs 查看自动生成的Swagger UI文档,测试API端点。
8. 示例操作
**创建用户:**发送POST请求到
/users/
,Body:{ "name": "Alice", "email": "alice@example.com" }
获取用户:发送GET请求到
/users/1
获取所有用户:发送GET请求到
/users/
何时使用ORM,何时使用原生SQL?
使用ORM的场景
- 快速开发:当需要快速构建应用,且业务逻辑主要是CRUD操作时,ORM能够显著提高开发效率。
- 维护性强:ORM通过模型类的定义,使代码更具可读性和可维护性,适合团队协作开发。
- 跨数据库支持:需要支持多种数据库,ORM提供了抽象层,简化数据库切换。
- 安全性:ORM自动处理SQL注入等安全问题,减少手动编写SQL带来的风险。
使用原生SQL的场景
- 复杂查询:当需要执行复杂的SQL查询,如多表连接、子查询、窗口函数等,原生SQL更为灵活和高效。
- 性能优化:对于性能要求极高的操作,手写优化的SQL可以更好地控制查询性能。
- 特定数据库功能:当需要使用数据库特有的功能或优化时,原生SQL更为适合。
- 学习和理解:在学习数据库和SQL的基础知识时,编写原生SQL有助于加深理解。
混合使用ORM和原生SQL
在实际项目中,可以根据需求混合使用ORM和原生SQL。例如,使用ORM进行常规的CRUD操作,使用原生SQL处理复杂查询或性能关键的部分。
示例:在SQLAlchemy中执行原生SQL
from sqlalchemy import text
from sqlalchemy.orm import Session
def get_users_with_native_sql(db: Session):
query = text("SELECT * FROM users WHERE name LIKE :name")
result = db.execute(query, {"name": "%Alice%"})
return result.fetchall()
总结
ORM(对象关系映射) 是一种强大的编程技术,通过将数据库表和记录映射为编程语言中的类和对象,简化了数据库操作,提升了开发效率和代码的可维护性。虽然ORM带来了许多优点,但在某些特定场景下,原生SQL仍然不可或缺。理解ORM的工作原理和适用场景,有助于在实际项目中做出更合适的技术选择。
关键要点回顾
- ORM的基本概念:通过模型类和对象操作数据库,无需编写SQL语句。
- ORM的优缺点:简化开发、提高生产力,但可能带来性能开销和灵活性限制。
- 常用的Python ORM库:SQLAlchemy、Django ORM、Tortoise ORM等。
- 使用SQLAlchemy的基本流程:定义模型、配置数据库、执行CRUD操作。
- ORM在FastAPI中的集成:通过依赖注入和会话管理,实现高效的数据库交互。
- ORM与原生SQL的选择:根据具体需求选择合适的数据库操作方式,灵活组合使用。
位置
在标准的Python后端开发中,遵循良好的代码组织和格式化规范不仅能提高代码的可读性和可维护性,还能促进团队协作。以下内容将详细介绍变量、类、方法的放置位置以及代码之间的间距,主要基于PEP 8(Python的官方编码规范)进行解释。
Python 后端开发中的代码组织与格式化规范
目录
代码组织概述
在Python中,代码通常组织在模块(.py文件)和包(包含__init__.py
的文件夹)中。良好的代码组织结构有助于模块化、重用和维护。以下是常见的代码组织层次:
- 模块(Module):一个
.py
文件,包含变量、函数、类等。 - 包(Package):一个包含
__init__.py
文件的文件夹,可以包含多个模块和子包。
示意结构:
my_project/
├── app/
│ ├── __init__.py
│ ├── models.py
│ ├── schemas.py
│ ├── views.py
│ └── utils.py
├── tests/
│ ├── __init__.py
│ ├── test_models.py
│ └── test_views.py
├── requirements.txt
└── README.md
变量的放置位置
全局变量
全局变量是在模块级别定义的变量,通常用于模块内部共享数据。在后端开发中,应尽量减少全局变量的使用,以避免命名冲突和不必要的耦合。
示例:
# app/config.py
# 不推荐:过多使用全局变量
API_KEY = "your_api_key_here"
TIMEOUT = 30
模块级变量
模块级变量是在模块的顶部定义的变量,通常用于配置或常量。这些变量可以在模块的任何位置访问。
示例:
# app/settings.py
DATABASE_URL = "sqlite:///./test.db"
SECRET_KEY = "your_secret_key_here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
类变量与实例变量
- 类变量:属于类本身,由所有实例共享。
- 实例变量:属于类的每个实例,独立存储数据。
示例:
# app/models.py
class User:
# 类变量
user_count = 0
def __init__(self, name, email):
# 实例变量
self.name = name
self.email = email
User.user_count += 1
常量
在Python中,常量通常使用全大写字母命名,位于模块的顶部。
示例:
# app/constants.py
MAX_CONNECTIONS = 100
DEFAULT_TIMEOUT = 30
类的放置位置
类定义的顺序
在一个模块中,通常将类定义放在函数定义之前,尤其是当函数依赖于类时。这有助于提升代码的可读性和逻辑性。
示例:
# app/models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
方法的放置位置
实例方法
实例方法是属于类的每个实例的方法,通常用于操作实例变量。
示例:
class User:
def __init__(self, name):
self.name = name
def greet(self):
return f"Hello, {self.name}!"
类方法与静态方法
类方法:使用
@classmethod
装饰器,接受类作为第一个参数(通常命名为cls
)。适用于需要访问或修改类状态的方法。示例:
class User: user_count = 0 @classmethod def increment_user_count(cls): cls.user_count += 1
静态方法:使用
@staticmethod
装饰器,不接受类或实例作为参数。适用于独立于类和实例的功能。示例:
class Utils: @staticmethod def add(a, b): return a + b
代码之间的间距
全局导入
- 导入语句应该位于文件顶部,紧跟着模块注释和文档字符串(如果有的话)。
- 导入顺序应遵循以下顺序:
- 标准库导入
- 第三方库导入
- 本地应用/库的导入
- 每个导入组之间应使用一个空行分隔。
示例:
# app/main.py
# 标准库导入
import asyncio
import logging
# 第三方库导入
from fastapi import FastAPI
from sqlalchemy.orm import Session
# 本地应用导入
from . import models, schemas, crud
from .database import SessionLocal, engine
模块级别
- 模块级别的变量、类和函数定义之间应该使用两个空行分隔。
示例:
# app/models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
class Item(Base):
__tablename__ = 'items'
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
类与函数之间
- 类定义之间应使用两个空行分隔。
- 类内部的方法之间应使用一个空行分隔。
示例:
# app/views.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from . import models, schemas, crud
from .database import SessionLocal
router = APIRouter()
@router.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(SessionLocal)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@router.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(SessionLocal)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
类内部方法之间
- 类内部的方法之间使用一个空行分隔。
示例:
class User:
def __init__(self, name, email):
self.name = name
self.email = email
def greet(self):
return f"Hello, {self.name}!"
def update_email(self, new_email):
self.email = new_email
控制流语句之间
- 控制流语句(如
if
、for
、while
等)之间一般不需要额外空行,但代码块内部应保持一致的缩进和空格使用。
示例:
def process_items(items):
for item in items:
if item.is_valid():
item.process()
else:
item.skip()
命名规范
遵循PEP 8的命名约定,有助于保持代码的一致性和可读性。
变量名:小写字母,单词之间使用下划线分隔(snake_case)。
user_name = "Alice" email_address = "alice@example.com"
函数名:小写字母,单词之间使用下划线分隔(snake_case)。
def calculate_total(): pass
类名:使用驼峰命名法,每个单词首字母大写(CamelCase)。
class UserProfile: pass
常量:全大写字母,单词之间使用下划线分隔(UPPER_SNAKE_CASE)。
MAX_CONNECTIONS = 100 DEFAULT_TIMEOUT = 30
模块名和包名:小写字母,必要时使用下划线分隔(snake_case)。
my_module.py my_package/
私有属性和方法:在名称前加单个或双下划线(或_),表示私有。
class Example: def __init__(self): self._private_attr = "secret" def _private_method(self): pass
示例代码
以下是一个整合了上述规范的示例项目,展示了变量、类、方法的正确放置以及代码间距的应用。
my_project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ ├── schemas.py
│ ├── crud.py
│ ├── database.py
│ └── routers/
│ ├── __init__.py
│ └── user.py
├── tests/
│ ├── __init__.py
│ └── test_user.py
├── requirements.txt
└── README.md
1. app/database.py
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .models import Base
DATABASE_URL = "sqlite:///./test.db"
# 创建数据库引擎
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False} # SQLite特有参数
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建所有数据库表
Base.metadata.create_all(bind=engine)
2. app/models.py
# app/models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
def __repr__(self):
return f"<User(name={self.name}, email={self.email})>"
3. app/schemas.py
# app/schemas.py
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
name: str
email: EmailStr
class UserCreate(UserBase):
pass
class UserResponse(UserBase):
id: int
class Config:
orm_mode = True
4. app/crud.py
# app/crud.py
from sqlalchemy.orm import Session
from .models import User
from .schemas import UserCreate
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
5. app/routers/user.py
# app/routers/user.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from .. import models, schemas, crud
from ..database import SessionLocal
router = APIRouter(
prefix="/users",
tags=["users"],
responses={404: {"description": "Not found"}},
)
# 依赖项,获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@router.post("/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@router.get("/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@router.get("/", response_model=List[schemas.UserResponse])
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
6. app/main.py
# app/main.py
from fastapi import FastAPI
from .routers import user
app = FastAPI(
title="User Management API",
description="A simple API to manage users.",
version="1.0.0",
)
# 包含路由
app.include_router(user.router)
@app.get("/")
def read_root():
return {"message": "Welcome to the User Management API!"}
7. tests/test_user.py
# tests/test_user.py
from fastapi.testclient import TestClient
from app.main import app
from app.database import Base, engine, SessionLocal
from app.models import User
from sqlalchemy.orm import Session
import pytest
client = TestClient(app)
# 测试数据库设置
@pytest.fixture(scope="module")
def test_db():
# 创建数据库表
Base.metadata.create_all(bind=engine)
db = SessionLocal()
yield db
db.close()
# 删除数据库表
Base.metadata.drop_all(bind=engine)
def test_create_user(test_db: Session):
response = client.post(
"/users/",
json={"name": "Alice", "email": "alice@example.com"}
)
assert response.status_code == 200
assert response.json()["name"] == "Alice"
assert response.json()["email"] == "alice@example.com"
def test_create_existing_user(test_db: Session):
response = client.post(
"/users/",
json={"name": "Alice", "email": "alice@example.com"}
)
assert response.status_code == 400
assert response.json()["detail"] == "Email already registered"
def test_read_user(test_db: Session):
response = client.get("/users/1")
assert response.status_code == 200
assert response.json()["name"] == "Alice"
assert response.json()["email"] == "alice@example.com"
def test_read_nonexistent_user(test_db: Session):
response = client.get("/users/999")
assert response.status_code == 404
assert response.json()["detail"] == "User not found"
def test_read_users(test_db: Session):
response = client.get("/users/")
assert response.status_code == 200
assert isinstance(response.json(), list)
assert len(response.json()) >= 1
总结与最佳实践
1. 遵循PEP 8规范
- 代码风格:使用四个空格进行缩进,限制每行不超过79个字符,适当使用空行分隔逻辑块等。
- 工具:使用
flake8
、black
等工具自动检查和格式化代码。
2. 模块化设计
- 职责单一:每个模块和包应专注于特定的功能,如
models
用于定义数据库模型,schemas
用于定义数据验证模型,crud
用于数据库操作,routers
用于API路由等。 - 重用性:通过模块化设计,提高代码的重用性和可维护性。
3. 清晰的命名
- 描述性:变量、函数、类的名称应具有描述性,能准确反映其用途和功能。
- 一致性:在整个项目中保持一致的命名风格。
4. 依赖注入
数据库会话:通过依赖项注入管理数据库会话,确保每个请求都有独立的会话,并在请求结束后正确关闭。
示例:
def get_db(): db = SessionLocal() try: yield db finally: db.close()
5. 错误处理
- 异常捕捉:使用
try-except
块捕捉和处理可能的异常,确保API能够返回有意义的错误信息。 - 统一响应:为不同类型的错误定义统一的响应格式,提升API的可预测性。
6. 测试覆盖
- 单元测试:编写覆盖关键功能的单元测试,确保代码的正确性。
- 集成测试:测试模块之间的交互,确保整体系统的稳定性。
7. 文档化
- 自动文档:利用FastAPI的自动文档生成功能(如Swagger UI),确保API文档与代码同步。
- 注释:在关键代码段添加注释,解释复杂的逻辑或设计选择。
8. 安全性
- 输入验证:使用Pydantic模型验证输入数据,防止恶意数据注入。
- 认证与授权:实现适当的认证和授权机制,保护API端点。
- 环境变量管理:将敏感信息(如数据库URL、密钥)存储在环境变量或配置文件中,避免硬编码。
9. 性能优化
- 数据库查询优化:使用索引、避免N+1查询等方法优化数据库性能。
- 缓存:在适当的场景下使用缓存机制,减少数据库负载和响应时间。
10. 持续集成与部署
- CI/CD:设置持续集成和部署流水线,自动化测试和部署流程,提升开发效率和代码质量。
- 容器化:使用Docker等工具容器化应用,确保环境一致性和可移植性。
附加资源
- PEP 8 官方文档:PEP 8 – Style Guide for Python Code
- FastAPI 官方文档:FastAPI Documentation
- SQLAlchemy 官方文档:SQLAlchemy Documentation
- Black 代码格式化工具:Black Documentation
- Flake8 代码检查工具:Flake8 Documentation
- pytest 官方文档:pytest Documentation
- Django ORM 文档:Django ORM Documentation
- Tortoise ORM 文档:Tortoise ORM Documentation
进一步学习与实践
- 深入理解PEP 8:全面阅读并理解PEP 8的所有细节,确保代码风格的一致性。
- 探索高级ORM功能:
- 关系映射:学习如何处理一对多、多对多等复杂关系。
- 查询优化:掌握如何编写高效的查询,避免性能瓶颈。
- 实践项目开发:通过构建实际项目,如博客系统、电商平台等,应用所学的代码组织和ORM知识。
- 学习异步编程:结合异步编程(如使用FastAPI的异步特性)提升应用的性能和响应性。
- 参与开源项目:通过贡献开源项目,获取实际开发经验和社区反馈。
- 持续集成与部署:学习并实践CI/CD工具,如GitHub Actions、GitLab CI等,自动化测试和部署流程。