【Python 操作 MySQL 数据库】

发布于:2025-05-17 ⋅ 阅读:(14) ⋅ 点赞:(0)

在 Python 中操作 MySQL 数据库主要通过 pymysqlmysql-connector-python 库实现。以下是完整的技术指南,包含连接管理、CRUD 操作和最佳实践:


一、环境准备

1. 安装驱动库
pip install pymysql          # 推荐(纯Python实现)
# 或
pip install mysql-connector-python  # Oracle官方驱动
2. 数据库准备
CREATE DATABASE testdb;
USE testdb;

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

二、连接管理

1. 基础连接
import pymysql

# 建立连接
conn = pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='testdb',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor  # 返回字典格式结果
)
2. 上下文管理器(推荐)
with pymysql.connect(host='localhost', user='root', password='your_pwd', database='testdb') as conn:
    with conn.cursor() as cursor:
        # 执行SQL操作
        pass
    # 连接在此处自动提交/回滚(取决于autocommit设置)

三、CRUD 操作

1. 查询数据
def get_user(username):
    try:
        with conn.cursor() as cursor:
            sql = "SELECT * FROM users WHERE username = %s"
            cursor.execute(sql, (username,))
            result = cursor.fetchone()  # 获取单条记录
            return result
    except pymysql.MySQLError as e:
        print(f"数据库错误: {e}")
        return None

user = get_user("john_doe")
print(user)  # 输出: {'id': 1, 'username': 'john_doe', ...}
2. 插入数据
def create_user(username, email):
    try:
        with conn.cursor() as cursor:
            sql = """INSERT INTO users 
                    (username, email) 
                    VALUES (%s, %s)"""
            cursor.execute(sql, (username, email))
            conn.commit()  # 显式提交事务
            return cursor.lastrowid  # 返回自增ID
    except pymysql.IntegrityError:
        print("用户名已存在")
        conn.rollback()
        return None

new_id = create_user("jane_smith", "jane@example.com")
3. 更新数据
def update_email(user_id, new_email):
    try:
        with conn.cursor() as cursor:
            sql = "UPDATE users SET email = %s WHERE id = %s"
            affected_rows = cursor.execute(sql, (new_email, user_id))
            conn.commit()
            return affected_rows > 0
    except pymysql.MySQLError:
        conn.rollback()
        return False
4. 批量操作
def batch_insert(users):
    try:
        with conn.cursor() as cursor:
            sql = "INSERT INTO users (username, email) VALUES (%s, %s)"
            cursor.executemany(sql, users)  # 批量执行
            conn.commit()
            return cursor.rowcount
    except pymysql.MySQLError:
        conn.rollback()
        return 0

batch_insert([
    ("user1", "u1@test.com"),
    ("user2", "u2@test.com")
])

四、高级技巧

1. 连接池管理
from dbutils.pooled_db import PooledDB

pool = PooledDB(
    creator=pymysql,
    maxconnections=10,
    host='localhost',
    user='root',
    password='your_pwd',
    database='testdb'
)

# 使用连接池获取连接
conn = pool.connection()
2. 存储过程调用
with conn.cursor() as cursor:
    cursor.callproc('get_user_stats', (1,))  # 调用存储过程
    result = cursor.fetchall()
3. 事务控制
try:
    with conn.cursor() as cursor:
        # 执行多个操作
        cursor.execute("UPDATE account SET balance = balance - 100 WHERE id = 1")
        cursor.execute("UPDATE account SET balance = balance + 100 WHERE id = 2")
        conn.commit()  # 显式提交事务
except:
    conn.rollback()  # 回滚所有操作

五、安全实践

1. 防止SQL注入
  • 永远使用参数化查询(不要拼接字符串)
    # 正确方式
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    
    # 危险方式(禁用!)
    cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
    
2. 敏感信息处理
  • 使用环境变量存储密码:
    import os
    password = os.getenv('DB_PASSWORD')
    
3. 连接超时设置
conn = pymysql.connect(
    connect_timeout=5,  # 连接超时5秒
    read_timeout=10     # 读取超时10秒
)

六、性能优化

1. 查询优化
  • 添加索引:

    ALTER TABLE users ADD INDEX idx_email (email);
    
  • 使用EXPLAIN分析查询:

    with conn.cursor() as cursor:
        cursor.execute("EXPLAIN SELECT * FROM users WHERE username = %s", ("john",))
        print(cursor.fetchall())
    
2. 结果集处理
  • 分页查询:

    sql = "SELECT * FROM users LIMIT %s OFFSET %s"
    cursor.execute(sql, (page_size, (page-1)*page_size))
    
  • 流式读取(大数据量):

    with conn.cursor(pymysql.cursors.SSCursor) as cursor:  # 使用服务器端游标
        cursor.execute("SELECT * FROM large_table")
        for row in cursor:
            process(row)
    

七、完整示例

import pymysql
from contextlib import contextmanager

@contextmanager
def database_connection():
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='your_pwd',
        database='testdb',
        charset='utf8mb4'
    )
    try:
        yield conn
    finally:
        conn.close()

def main():
    with database_connection() as conn:
        with conn.cursor() as cursor:
            # 创建用户
            cursor.execute("""
                INSERT INTO users (username, email)
                VALUES (%s, %s)
            """, ("new_user", "user@example.com"))
            user_id = cursor.lastrowid
            
            # 查询用户
            cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            print(cursor.fetchone())
            
            # 更新记录
            cursor.execute("""
                UPDATE users 
                SET email = %s 
                WHERE id = %s
            """, ("new_email@example.com", user_id))
            
            conn.commit()

if __name__ == "__main__":
    main()

八、故障排查

1. 常见错误码
  • 1045: 访问被拒绝(检查用户名/密码)
  • 2003: 无法连接(检查主机/端口)
  • 1062: 唯一键冲突
  • 1146: 表不存在
2. 日志记录
import logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 在连接参数中添加
conn = pymysql.connect(..., cursorclass=pymysql.cursors.SSDictCursor,
                       client_flag=pymysql.client.CLIENT.MULTI_STATEMENTS)

通过遵循这些实践,可以构建安全、高效的数据库交互应用。对于复杂场景,建议结合ORM框架(如SQLAlchemy)进行抽象层开发。


网站公告

今日签到

点亮在社区的每一天
去签到