一次性接收大量上传图片,后端优化方式

发布于:2025-08-01 ⋅ 阅读:(13) ⋅ 点赞:(0)

1. 分块接收与流式处理

使用流式处理避免将所有图片加载到内存中:

from flask import Flask, request
import os

app = Flask(__name__)

@app.route('/upload', methods=['POST'])
def upload_images():
    uploaded_files = request.files.getlist("images")
    
    # 流式处理,避免一次性加载所有文件到内存
    for file in uploaded_files:
        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            # 直接保存到磁盘,不加载到内存
            file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
    
    return {'status': 'success', 'count': len(uploaded_files)}

2. 异步处理

使用异步任务队列处理耗时操作:

from celery import Celery
from flask import Flask, request

app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')

@celery.task
def process_images_task(file_paths):
    # 在后台处理图片(压缩、格式转换等)
    results = []
    for file_path in file_paths:
        # 处理逻辑
        result = process_single_image(file_path)
        results.append(result)
    return results

@app.route('/upload', methods=['POST'])
def upload_images():
    file_paths = []
    for file in request.files.getlist("images"):
        filename = secure_filename(file.filename)
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        file.save(file_path)
        file_paths.append(file_path)
    
    # 异步处理图片
    task = process_images_task.delay(file_paths)
    
    return {'status': 'success', 'task_id': task.id}

3. 内存映射与临时文件

使用内存映射和临时文件减少内存占用:

import tempfile
import mmap

def process_large_image(file):
    # 创建临时文件而不是加载到内存
    with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
        file.save(tmp_file.name)
        
        # 使用内存映射处理大文件
        with open(tmp_file.name, 'r+b') as f:
            with mmap.mmap(f.fileno(), 0) as mmapped_file:
                # 处理映射的文件内容
                process_mapped_data(mmapped_file)
        
        # 清理临时文件
        os.unlink(tmp_file.name)

4. 数据库优化

批量插入和连接池管理:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建连接池
engine = create_engine(
    'postgresql://user:password@localhost/db',
    pool_size=20,
    max_overflow=30
)
Session = sessionmaker(bind=engine)

def batch_insert_image_records(image_data_list):
    session = Session()
    try:
        # 批量插入
        session.bulk_insert_mappings(ImageModel, image_data_list)
        session.commit()
    except Exception as e:
        session.rollback()
        raise e
    finally:
        session.close()

5. 缓存策略

使用Redis等缓存减少重复处理:

import redis
import hashlib
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_cached_result(file_hash):
    cached = redis_client.get(f"image_result:{file_hash}")
    return json.loads(cached) if cached else None

def cache_result(file_hash, result):
    redis_client.setex(
        f"image_result:{file_hash}",
        3600,  # 1小时过期
        json.dumps(result)
    )

def process_image_with_cache(file):
    file_content = file.read()
    file_hash = hashlib.md5(file_content).hexdigest()
    
    # 检查缓存
    cached_result = get_cached_result(file_hash)
    if cached_result:
        return cached_result
    
    # 处理图片
    result = process_image_logic(file_content)
    
    # 缓存结果
    cache_result(file_hash, result)
    return result

6. 压缩与格式优化

在服务器端进一步优化图片:

from PIL import Image
import io

def optimize_image(file, max_size=(1920, 1080), quality=85):
    image = Image.open(file)
    
    # 调整尺寸
    image.thumbnail(max_size, Image.LANCZOS)
    
    # 优化并保存
    output = io.BytesIO()
    image.save(output, format='JPEG', quality=quality, optimize=True)
    output.seek(0)
    
    return output

7. 限流与并发控制

控制并发请求数量:

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["100 per hour"]
)

@app.route('/upload', methods=['POST'])
@limiter.limit("10 per minute")
def upload_images():
    # 上传处理逻辑
    pass

8. 分布式存储

使用分布式文件系统存储大量图片:

import boto3
from botocore.exceptions import ClientError

s3_client = boto3.client('s3')

def upload_to_s3(file, bucket, key):
    try:
        s3_client.upload_fileobj(file, bucket, key)
        return f"https://{bucket}.s3.amazonaws.com/{key}"
    except ClientError as e:
        print(f"Error uploading to S3: {e}")
        return None

def batch_upload_to_s3(files, bucket):
    urls = []
    for file in files:
        key = f"images/{secure_filename(file.filename)}"
        url = upload_to_s3(file, bucket, key)
        if url:
            urls.append(url)
    return urls

9. 响应优化

使用流式响应和压缩:

from flask import Response
import json

@app.route('/upload', methods=['POST'])
def upload_images_stream():
    def generate():
        yield '{"status": "processing", "files": ['
        
        files = request.files.getlist("images")
        for i, file in enumerate(files):
            # 处理每个文件
            result = process_file(file)
            yield json.dumps(result)
            if i < len(files) - 1:
                yield ","
        
        yield ']}'
    
    return Response(generate(), mimetype='application/json')

10. 监控与错误处理

集成监控和错误处理机制:

import logging
from prometheus_client import Counter, Histogram

# 定义监控指标
upload_counter = Counter('image_uploads_total', 'Total image uploads')
upload_duration = Histogram('image_upload_duration_seconds', 'Image upload duration')

@app.route('/upload', methods=['POST'])
@upload_duration.time()
def upload_images():
    try:
        files = request.files.getlist("images")
        upload_counter.inc(len(files))
        
        # 处理逻辑
        results = process_files(files)
        
        return {'status': 'success', 'count': len(results)}
    except Exception as e:
        logging.error(f"Upload error: {e}")
        return {'status': 'error', 'message': str(e)}, 500

11. 数据库连接池优化

优化数据库连接池配置:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'postgresql://user:password@localhost/db',
    poolclass=QueuePool,
    pool_size=20,          # 连接池大小
    max_overflow=30,       # 超出pool_size后最多可创建的连接数
    pool_recycle=3600,     # 连接回收时间(秒)
    pool_pre_ping=True,    # 检查连接有效性
    pool_timeout=30        # 获取连接超时时间
)