FastAPI系列14:API限流与暴力破解防护

发布于:2025-05-08 ⋅ 阅读:(24) ⋅ 点赞:(0)


FastAPI系列12:使用JWT 登录认证和RBAC 权限控制FastAPI系列13:API的安全防护 两节中,我们讨论了FastAPI中基本的安全防护技术,本节我们继续讨论API限流与暴力破解的防护。

1、暴力破解防护策略

“暴力破解”是一种常见的攻击手段,在针对web api攻击中,一般会使用这种手段对应用系统的认证信息进行获取。 其过程就是使用大量的认证信息在认证接口进行尝试登录,直到得到正确的结果。
一般情况下,针对“暴力破解”攻击,我们可以采用以下策略:

  1. 限流(Rate Limiting)
    限制每个 IP、用户、接口单位时间内的访问频率,例如:登录接口每分钟最多 5 次。

  2. 验证码(Captcha)

    登录失败次数多时要求验证,防止机器人攻击。

  3. 登录失败锁定机制

    多次失败后暂时冻结账户或 IP。

2、接口限流(Rate Limiting)

在 FastAPI 中实现接口限流(Rate Limiting),常用方法是使用 slowapi 这个库,它基于 Starlette 和 Redis。以下是一个完整的实现过程:

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# 创建限流器
limiter = Limiter(key_func=get_remote_address)

# 创建应用
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# 应用限流装饰器(例如每分钟最多访问 5 次)
@app.get("/limited")
@limiter.limit("5/minute")
def limited_endpoint(request: Request):
    return {"message": "This endpoint is rate-limited"}

限流策略语法

  • “1/second”:每秒 1 次
  • “5/minute”:每分钟 5 次
  • “100/hour”:每小时 100 次

3、分布式限流

在上面示例的基础上,我们可以进一步配合使用Redis 实现更大规模的分布式限流。

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from redis import Redis

# 初始化 Redis 连接
redis_client = Redis(host='localhost', port=6379, db=0)

# 初始化限流器,使用 Redis 存储
limiter = Limiter(key_func=get_remote_address, storage_uri="redis://localhost:6379")

# 创建 FastAPI 应用
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# 设置限流:同一 IP 每分钟最多访问 10 次
@app.get("/api")
@limiter.limit("10/minute")
def limited_api(request: Request):
    return {"message": "You are within the rate limit."}

4、按用户 Token 限流

在 FastAPI 中按用户 Token(如 JWT)限流,可以自定义 key_func,提取请求头中的用户标识作为限流键。以下是实现示例:

from fastapi import FastAPI, Request, Header, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from starlette.responses import JSONResponse
import jwt  # 使用 PyJWT

# 模拟解析 JWT(请替换为你自己的解析逻辑)
SECRET = "your-secret-key"
def get_user_id_from_token(authorization: str):
    try:
        token = authorization.split(" ")[1]
        payload = jwt.decode(token, SECRET, algorithms=["HS256"])
        return payload["sub"]  # 假设用户 ID 存在 sub 字段中
    except Exception:
        return "anonymous"  # 未登录用户或异常,统一使用默认限流键

# 自定义限流 key 函数
def token_key_func(request: Request):
    auth = request.headers.get("Authorization")
    return get_user_id_from_token(auth) if auth else get_remote_address(request)

# 初始化限流器
limiter = Limiter(key_func=token_key_func, storage_uri="redis://localhost:6379")

# 初始化应用
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# 接口限流(按用户 ID)
@app.get("/user-api")
@limiter.limit("5/minute")
def user_api(request: Request):
    return {"message": "You are within your user-specific rate limit."}

上面的示例可以实现:

  • 每个用户(Token)单独限流;
  • 未登录用户使用 IP 地址限流;
  • 支持 JWT、Session 或其他认证方式。

5、验证码

在 FastAPI 中可以使用captcha、Pillow实现图形验证码,以结合上面提到的限流技术实现对接口的暴力破解防护。当然要实现更大规模的防护能力时,往往需要借助第三方专业平台。
以下为利用captcha库实现的图形验证码:

from fastapi import FastAPI, Response
from captcha.image import ImageCaptcha
import random
import string
from io import BytesIO

app = FastAPI()

@app.get("/captcha")
def get_captcha():
    image = ImageCaptcha(width=160, height=60)
    code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
    data = image.generate(code)
    
    # 存入 Redis 或 Session,供后续验证
    # redis.setex("captcha:client_ip", 300, code.lower())

    return Response(content=data.read(), media_type="image/png")

在实际使用中,我们需要把验证码文本(如 ABCD1)保存在服务端(如 Redis),并在登录时比对。


网站公告

今日签到

点亮在社区的每一天
去签到