实验六-使用PyMySQL数据存储的Flask登录系统
一、实验目的和任务
- 理解Web应用用户认证的基本流程和实现原理
- 掌握使用PyMySQL连接和操作MySQL数据库的方法
- 实现基于数据库的用户注册/登录功能
- 分析明文存储密码的安全隐患
二、实验内容
PyMySQL基础操作:数据库连接池配置、游标对象的使用、SQL语句执行与结果处理
Flask集成PyMySQL:应用上下文管理、请求生命周期中的连接处理、错误处理与事务回滚
用户系统实现:注册功能、登录功能、用户查询功能
三、实验步骤
1. 数据库准备(db_init.sql)
/*
* 实验六数据库初始化脚本 - Navicat版
* 在Navicat中执行方法:
* 1. 连接到目标MySQL服务器
* 2. 点击"查询"->"新建查询"
* 3. 复制本脚本内容到查询窗口
* 4. 点击"运行"执行脚本
*/
-- 创建数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS flask_auth
CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci;
-- 使用数据库
USE flask_auth;
-- 创建用户表
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '用户ID',
username VARCHAR(50) UNIQUE NOT NULL COMMENT '用户名',
password VARCHAR(100) NOT NULL COMMENT '密码(加密存储)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';
-- 验证数据库和表是否创建成功
SELECT '数据库和表创建成功' AS 验证结果;
-- 查看表结构(执行此语句验证表是否创建正确)
SELECT
COLUMN_NAME AS '字段名',
COLUMN_TYPE AS '数据类型',
IS_NULLABLE AS '允许空',
COLUMN_KEY AS '键',
COLUMN_DEFAULT AS '默认值',
EXTRA AS '额外信息',
COLUMN_COMMENT AS '注释'
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA = 'flask_auth'
AND TABLE_NAME = 'users';
2. 修改app.py
from flask import Flask, render_template, request, redirect, send_from_directory, url_for, flash, g, session
from markupsafe import Markup
import pymysql
from werkzeug.security import generate_password_hash, check_password_hash
# Navicat执行初始化脚本说明:
# 1. 在Navicat中新建查询
# 2. 打开db_init.sql文件
# 3. 执行脚本创建数据库和表
import os
# 获取当前文件所在目录的绝对路径
base_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(base_dir, 'templates')
app = Flask(__name__, template_folder=template_dir)
app.secret_key = 'your_secret_key_here'#这个是用来加密session的,可以自己设置
# 添加模板文件夹检查
if not os.path.exists(template_dir):
raise Exception(f"模板文件夹不存在: {template_dir}")
if not os.path.isfile(os.path.join(template_dir, 'login.html')):
raise Exception("login.html 文件不存在")
if not os.path.isfile(os.path.join(template_dir, 'register.html')):
raise Exception("register.html 文件不存在")
# 数据库配置 (请根据Navicat连接设置修改以下参数)
DB_CONFIG = {
'host': 'localhost', # Navicat连接的主机地址
'user': 'root', # Navicat连接的用户名
'password': '123456', # Navicat连接的密码
'db': 'flask_auth', # 在Navicat中创建的数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
'port': 3306 # Navicat连接的端口号,默认3306
}
# 数据库连接函数
def get_db():
if 'db' not in g:
g.db = pymysql.connect(**DB_CONFIG)
return g.db
# 应用关闭时关闭数据库连接
@app.teardown_appcontext
def teardown_db(exception):
db = g.pop('db', None)
if db is not None:
db.close()
@app.route('/')
def index():
if 'username' in session:
return render_template('index.html', username=session['username'])
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
print(f"Debug: 访问注册页面,方法: {request.method}") # 添加调试信息
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if not username or not password:
flash('用户名和密码不能为空')
else:
db = get_db()
try:
with db.cursor() as cursor:
# 检查用户名是否已存在
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
if cursor.fetchone():
flash('用户名已存在')
else:
# 密码哈希存储
hashed_password = generate_password_hash(password)
cursor.execute(
"INSERT INTO users (username, password) VALUES (%s, %s)",
(username, hashed_password)
)
db.commit()
flash('注册成功,请登录')
return redirect(url_for('login'))
except Exception as e:
db.rollback()
flash('注册失败,请重试')
app.logger.error(f"注册错误: {str(e)}")
print(f"Debug: 注册模板路径: {os.path.join(app.template_folder, 'register.html')}") # 添加模板路径调试
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
try:
with db.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cursor.fetchone()
if not user:
flash('用户名不存在')
elif not check_password_hash(user['password'], password):
flash('密码错误')
else:
session['username'] = username
flash('登录成功')
return redirect(url_for('index'))
except Exception as e:
flash('登录失败,请重试')
app.logger.error(f"登录错误: {str(e)}")
return render_template('login.html')
@app.route('/logout')
def logout():
session.pop('username', None)
return redirect(url_for('index'))
# 静态文件路由
@app.route('/static/<path:filename>')
def static_files(filename):
return send_from_directory(os.path.join(app.root_path, 'static'), filename)
# favicon处理
@app.route('/favicon.ico')
def favicon():
return '', 204 # 返回空内容避免404错误
# 优化错误处理
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(e):
return render_template('500.html'), 500
if __name__ == '__main__':
# 启用详细调试信息
app.config['TRAP_HTTP_EXCEPTIONS'] = True
app.config['EXPLAIN_TEMPLATE_LOADING'] = True
# 打印所有路由信息
print("可用路由:")
for rule in app.url_map.iter_rules():
print(f"{rule.endpoint}: {rule.rule}")
# 打印模板配置信息
print(f"\n模板文件夹: {app.template_folder}")
print(f"静态文件夹: {app.static_folder}")
app.run(debug=True, host='0.0.0.0', port=5000)
3. 其他前端界面参照实验五
4. 功能测试流程
1.注册新用户:访问/register页面、提交用户名和密码(如:test/123456)
2.数据库验证,确认密码以明文形式存储:
3.登录测试:访问/login页面、使用相同凭证登录、验证登录是否成功
四、思考题
1.示例代码中为什么使用参数化查询(%s)而不是字符串拼接?
技术原理:
查询与数据分离处理机制
预编译语句(PreparedStatement)实现
自动类型安全检测
安全对比实验:
# 安全示例(参数化)
cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s",
(username, pwd_hash))
# 危险示例(拼接) - 易受SQL注入攻击
sql = f"SELECT * FROM users WHERE username='{username}' AND password='{pwd_hash}'"
cursor.execute(sql) # 当username输入"admin'--"时会产生漏洞
2.PyMySQL的cursorclass参数有什么作用?
游标类型返回格式内存占用适用场景
Cursor 元组 低 简单查询
DictCursor 字典 中 API开发
SSCursor 元组(流式) 极低 大数据量
SSDictCursor 字典(流式) 低 大数据量API
配置示例:
conn = pymysql.connect(
cursorclass=pymysql.cursors.SSDictCursor, # 推荐生产环境使用
host='mysql.example.com',
user='app_user',
password='encrypted_password',
db='app_db',
charset='utf8mb4'
)
3.什么情况下需要手动调用commit()?
必须手动提交的场景:
数据变更操作(DML)后
多步骤事务完成时
重要业务操作完成后
事务模板:
try:
conn.begin() # 显式开始事务
with conn.cursor() as cursor:
# 操作1:扣减库存
cursor.execute("""
UPDATE products SET stock=stock-%s
WHERE id=%s AND stock>=%s
""", (quantity, product_id, quantity))
# 操作2:创建订单
cursor.execute("""
INSERT INTO orders VALUES
(NULL, %s, %s, %s, NOW())
""", (user_id, product_id, quantity))
conn.commit() # 显式提交
except Exception as e:
conn.rollback() # 失败回滚
logger.error(f"Transaction failed: {str(e)}")
raise
finally:
conn.close() # 确保连接关闭
4.PyMySQL执行INSERT和SELECT语句时有哪些安全注意事项?
INSERT安全规范:
使用参数化查询
验证数据完整性
处理唯一约束异常
限制批量插入数量
SELECT防御措施:
# 安全的分页查询实现
def get_paginated_data(page, size):
size = min(100, max(1, size)) # 限制每页最大100条
offset = (max(1, page) - 1) * size
with conn.cursor() as cursor:
cursor.execute("""
SELECT id, name FROM products
WHERE status=1
ORDER BY create_time DESC
LIMIT %s OFFSET %s
""", (size, offset))
return cursor.fetchall()
5.为什么要在except块中调用rollback()?
关键作用:
保证事务原子性
避免脏数据残留
释放数据库锁资源
维持连接池健康
错误处理模式:
try:
# 业务操作...
except pymysql.err.IntegrityError:
con
实验七-集成Flask-SocketIO的实时通信系统
一、实验目的和任务
理解WebSocket通信协议与HTTP协议的区别
掌握Flask-SocketIO的集成与实时通信实现
实现用户在线状态实时同步与消息推送功能
实验任务
在实验六代码基础上(登录注册基础上)集成Socket.IO功能
实现以下核心功能:
实时聊天消息广播
动态在线用户列表
用户登录/登出状态通知
二、实验内容
新增技术组件
Flask-SocketIO服务端集成
Socket.IO客户端开发
双向事件通信机制(emit/on)
会话状态维持与广播机制
前后端实时数据同步
三、实验步骤
1. 环境准备
# 安装新依赖
pip install flask-socketio eventlet
2. 修改工程结构
├── app.py # 主程序(升级)
├── static
│ ├── style.css # 原样式文件
│ └── chat.js # 新增Socket.IO客户端逻辑
└── templates
├── home.html # 升级聊天界面
├── login.html # 原登录页面
└── register.html # 原注册页面
3. 升级app.py (服务端核心修改)
import eventlet
eventlet.monkey_patch()
from flask import Flask, render_template, request, redirect, send_from_directory, url_for, flash, g, session
from markupsafe import Markup
import pymysql
from werkzeug.security import generate_password_hash, check_password_hash
from flask_socketio import SocketIO, emit, join_room
from datetime import datetime
import os
# 获取当前文件所在目录的绝对路径
base_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(base_dir, 'templates')
app = Flask(__name__, template_folder=template_dir)
app.secret_key = 'your_secret_key_here' # 这个是用来加密session的,可以自己设置
# Socket.IO配置
socketio = SocketIO(app, async_mode='eventlet', cors_allowed_origins="*")
# 在线用户管理
online_users = set()
# 添加模板文件夹检查
if not os.path.exists(template_dir):
raise Exception(f"模板文件夹不存在: {template_dir}")
if not os.path.isfile(os.path.join(template_dir, 'login.html')):
raise Exception("login.html 文件不存在")
if not os.path.isfile(os.path.join(template_dir, 'register.html')):
raise Exception("register.html 文件不存在")
# 数据库配置 (请根据Navicat连接设置修改以下参数)
DB_CONFIG = {
'host': 'localhost', # Navicat连接的主机地址
'user': 'root', # Navicat连接的用户名
'password': '123456', # Navicat连接的密码
'db': 'fyt', # 在Navicat中创建的数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
'port': 3306 # Navicat连接的端口号,默认3306
}
# 数据库连接函数
def get_db():
if 'db' not in g:
g.db = pymysql.connect(**DB_CONFIG)
return g.db
# 应用关闭时关闭数据库连接
@app.teardown_appcontext
def teardown_db(exception):
db = g.pop('db', None)
if db is not None:
db.close()
@app.route('/')
def index():
if 'username' in session:
current_user = {'username': session['username'], 'is_admin': False} # 根据你的实际情况定义 is_admin 等属性
return render_template('index.html', username=session['username'], current_user=current_user)
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
print(f"Debug: 访问注册页面,方法: {request.method}") # 添加调试信息
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if not username or not password:
flash('用户名和密码不能为空')
else:
db = get_db()
try:
with db.cursor() as cursor:
# 检查用户名是否已存在
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
if cursor.fetchone():
flash('用户名已存在')
else:
# 密码哈希存储
hashed_password = generate_password_hash(password)
cursor.execute(
"INSERT INTO users (username, password) VALUES (%s, %s)",
(username, hashed_password)
)
db.commit()
flash('注册成功,请登录')
return redirect(url_for('login'))
except Exception as e:
db.rollback()
flash('注册失败,请重试')
app.logger.error(f"注册错误: {str(e)}")
print(f"Debug: 注册模板路径: {os.path.join(app.template_folder, 'register.html')}") # 添加模板路径调试
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
try:
with db.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cursor.fetchone()
if not user:
flash('用户名不存在')
elif not check_password_hash(user['password'], password):
flash('密码错误')
else:
session['username'] = username
flash('登录成功')
return redirect(url_for('index'))
except Exception as e:
flash('登录失败,请重试')
app.logger.error(f"登录错误: {str(e)}")
return render_template('login.html')
# 静态文件路由
@app.route('/static/<path:filename>')
def static_files(filename):
return send_from_directory(os.path.join(app.root_path, 'static'), filename)
# favicon处理
@app.route('/favicon.ico')
def favicon():
return '', 204 # 返回空内容避免404错误
# 优化错误处理
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(e):
return render_template('500.html'), 500
# 房间状态管理
active_rooms = {} # {room_id: {'name': room_name, 'users': [username1, ...]}}
private_chats = set() # 存储私信会话 {(user1, user2)}
# Socket.IO事件处理
@socketio.on('connect')
def handle_connect():
if 'username' in session:
username = session['username']
online_users.add(username)
emit('user_join', {'username': username, 'time': datetime.now().strftime('%H:%M:%S')}, broadcast=True)
emit('online_users', {'users': list(online_users)}, broadcast=True)
emit('room_list', {'rooms': active_rooms}) # 发送当前房间列表
@socketio.on('disconnect')
def handle_disconnect():
if 'username' in session:
username = session['username']
if username in online_users:
online_users.remove(username)
# 离开所有房间
for room_id, room in active_rooms.items():
if username in room['users']:
room['users'].remove(username)
emit('user_left_room', {'room_id': room_id, 'username': username}, room=room_id)
emit('user_leave', {'username': username, 'time': datetime.now().strftime('%H:%M:%S')}, broadcast=True)
emit('online_users', {'users': list(online_users)}, broadcast=True)
@socketio.on('message')
def handle_message(data):
if 'username' in session:
username = session['username']
message_data = {
'username': username,
'message': data['message'],
'time': datetime.now().strftime('%H:%M:%S')
}
# 判断消息类型
if 'room_id' in data: # 房间消息
room_id = data['room_id']
if room_id in active_rooms and username in active_rooms[room_id]['users']:
emit('new_room_message', {**message_data, 'room_id': room_id}, room=room_id)
elif 'target_user' in data: # 私信
target_user = data['target_user']
if target_user in online_users:
emit('new_private_message', {**message_data, 'target_user': target_user},
room=f"{username}_{target_user}" if f"{username}_{target_user}" in private_chats
else f"{target_user}_{username}")
else: # 公共消息
emit('new_message', message_data, broadcast=True)
# 房间管理事件
@socketio.on('create_room')
def handle_create_room(data):
if 'username' in session:
username = session['username']
room_name = data.get('room_name', '新房间')
# 在实际应用中应该使用数据库生成room_id
room_id = len(active_rooms) + 1
active_rooms[room_id] = {
'name': room_name,
'creator': username,
'users': [username]
}
join_room(str(room_id))
emit('room_created', {
'room_id': room_id,
'room_name': room_name,
'creator': username
}, broadcast=True)
@socketio.on('join_room')
def handle_join_room(data):
if 'username' in session and 'room_id' in data:
username = session['username']
room_id = data['room_id']
if room_id in active_rooms and username not in active_rooms[room_id]['users']:
active_rooms[room_id]['users'].append(username)
join_room(str(room_id))
emit('user_joined_room', {
'room_id': room_id,
'username': username
}, room=room_id)
@socketio.on('leave_room')
def handle_leave_room(data):
if 'username' in session and 'room_id' in data:
username = session['username']
room_id = data['room_id']
if room_id in active_rooms and username in active_rooms[room_id]['users']:
active_rooms[room_id]['users'].remove(username)
leave_room(str(room_id))
emit('user_left_room', {
'room_id': room_id,
'username': username
}, room=room_id)
# 私信功能
@socketio.on('start_private_chat')
def handle_start_private_chat(data):
if 'username' in session and 'target_user' in data:
username = session['username']
target_user = data['target_user']
if target_user in online_users:
chat_id = f"{username}_{target_user}" if username < target_user else f"{target_user}_{username}"
private_chats.add(chat_id)
join_room(chat_id)
emit('private_chat_started', {
'target_user': target_user,
'initiator': username
}, room=chat_id)
# 修改登出路由以处理Socket.IO连接
@app.route('/logout')
def logout():
if 'username' in session:
username = session.pop('username', None)
if username in online_users:
online_users.remove(username)
socketio.emit('user_leave', {'username': username, 'time': datetime.now().strftime('%H:%M:%S')}, broadcast=True)
socketio.emit('online_users', {'users': list(online_users)}, broadcast=True)
return redirect(url_for('index'))
if __name__ == '__main__':
# 启用详细调试信息
app.config['TRAP_HTTP_EXCEPTIONS'] = True
app.config['EXPLAIN_TEMPLATE_LOADING'] = True
# 打印所有路由信息
print("可用路由:")
for rule in app.url_map.iter_rules():
print(f"{rule.endpoint}: {rule.rule}")
# 打印模板配置信息
print(f"\n模板文件夹: {app.template_folder}")
print(f"静态文件夹: {app.static_folder}")
socketio.run(app, debug=True, host='127.0.0.1', port=5001)
4. 新增chat.js (客户端通信逻辑)
console.log("chat.js已加载");
const socket = io();
// 当前房间和私信状态
let currentRoom = null;
let privateChats = {};
// 调试Socket连接状态
socket.on('connect', () => {
console.log('已连接到Socket.IO服务器');
});
socket.on('disconnect', () => {
console.log('已断开与Socket.IO服务器的连接');
});
// 消息显示函数
function displayMessage(data, type = 'public') {
const chatArea = document.getElementById('chat-area');
const messageElement = document.createElement('div');
// 设置不同消息类型的样式
messageElement.className = `message ${type}-message`;
// 根据消息类型构建内容
let content = '';
if (type === 'room') {
content = `[房间 ${data.room_id}] <span class="username">${data.username}</span>`;
} else if (type === 'private') {
content = `[私信] <span class="username">${data.username}</span>`;
} else {
content = `<span class="username">${data.username}</span>`;
}
messageElement.innerHTML = `
${content}
<span class="time">${data.time}</span>
<div class="content">${data.message}</div>
`;
chatArea.appendChild(messageElement);
chatArea.scrollTop = chatArea.scrollHeight;
}
// 房间相关事件
socket.on('room_created', (room) => {
displayMessage({
username: '系统',
message: `房间 "${room.room_name}" 已创建`,
time: new Date().toLocaleTimeString()
}, 'system');
updateRoomList();
});
// 房间消息处理
socket.on('new_room_message', (data) => {
displayMessage(data, 'room');
});
socket.on('user_joined_room', (data) => {
displayMessage({
username: '系统',
message: `${data.username} 加入了房间`,
time: new Date().toLocaleTimeString()
}, 'system');
});
socket.on('user_left_room', (data) => {
displayMessage({
username: '系统',
message: `${data.username} 离开了房间`,
time: new Date().toLocaleTimeString()
}, 'system');
});
// 更新房间列表
function updateRoomList() {
socket.emit('get_rooms');
}
socket.on('room_list', (data) => {
const roomList = document.getElementById('rooms');
roomList.innerHTML = '';
data.rooms.forEach(room => {
const roomItem = document.createElement('li');
if(currentRoom === room.id) {
roomItem.classList.add('current-room');
roomItem.innerHTML = `
<span class="room-name"><strong>${room.name}</strong> (${room.users.length}人) [当前房间]</span>
<button class="leave-room" data-id="${room.id}">离开房间</button>
`;
} else {
roomItem.innerHTML = `
<span class="room-name">${room.name} (${room.users.length}人)</span>
<button class="join-room" data-id="${room.id}">加入房间</button>
`;
}
roomList.appendChild(roomItem);
});
});
// 添加按钮加载状态
function setButtonLoading(button, isLoading) {
if(isLoading) {
button.disabled = true;
button.innerHTML = button.innerHTML.replace(/>([^<]+)</, '>处理中...<');
} else {
button.disabled = false;
if(button.classList.contains('join-room')) {
button.innerHTML = button.innerHTML.replace(/>([^<]+)</, '>加入房间<');
} else if(button.classList.contains('leave-room')) {
button.innerHTML = button.innerHTML.replace(/>([^<]+)</, '>离开房间<');
}
}
}
// 私信功能
function openPrivateChat(user) {
if (!privateChats[user]) {
const pmWindow = document.createElement('div');
pmWindow.className = 'pm-window';
pmWindow.dataset.target = user;
pmWindow.innerHTML = `
<div class="pm-header">与 ${user} 的私信</div>
<div class="pm-messages"></div>
<input type="text" class="pm-input" placeholder="输入私信...">
<button class="pm-send">发送</button>
`;
document.getElementById('pm-windows').appendChild(pmWindow);
privateChats[user] = true;
socket.emit('start_private_chat', { target_user: user });
}
}
// 事件监听
document.addEventListener('click', (e) => {
// 创建房间
if (e.target.id === 'create-room-button') {
const roomName = document.getElementById('room-name-input').value.trim();
if (roomName) {
socket.emit('create_room', { room_name: roomName });
document.getElementById('room-name-input').value = '';
// 创建房间后延迟更新列表
setTimeout(updateRoomList, 300);
}
}
// 加入房间
if (e.target.classList.contains('join-room')) {
const roomId = e.target.dataset.id;
socket.emit('join_room', { room_id: roomId });
currentRoom = roomId;
// 加入房间后延迟更新列表
setTimeout(updateRoomList, 300);
}
// 离开房间
if (e.target.classList.contains('leave-room')) {
const roomId = e.target.dataset.id;
socket.emit('leave_room', { room_id: roomId }, (response) => {
if (response.success) {
currentRoom = null;
displayMessage({
username: '系统',
message: `你已离开房间`,
time: new Date().toLocaleTimeString()
}, 'system');
updateRoomList();
} else {
displayMessage({
username: '系统',
message: `离开房间失败: ${response.error}`,
time: new Date().toLocaleTimeString()
}, 'system');
}
});
}
// 私信
if (e.target.classList.contains('pm-btn')) {
const user = e.target.dataset.user;
openPrivateChat(user);
}
});
// 发送消息逻辑修改
document.getElementById('send-button').addEventListener('click', sendMessage);
document.getElementById('message-input').addEventListener('keypress', (e) => {
if (e.key === 'Enter') sendMessage();
});
function sendMessage() {
const messageInput = document.getElementById('message-input');
const message = messageInput.value.trim();
if (message) {
if (currentRoom) {
socket.emit('message', {
message,
room_id: currentRoom
});
} else {
socket.emit('message', { message });
}
messageInput.value = '';
}
}
// 初始化
updateRoomList();
6. 新增登出路由(需同步更新用户状态)
@app.route('/logout')
def logout():
"""处理用户登出"""
username = session.pop('username', None)
if username in online_users:
online_users.remove(username)
socketio.emit('user_leave', {'username': username}, broadcast=True)
flash('您已成功登出')
return redirect(url_for('login'))
四、功能测试流程
基础通信测试
打开两个浏览器窗口分别登录不同账号
验证消息实时同步显示
检查控制台错误日志(目前无错误)
状态同步测试(失败)
新用户登录时观察其他客户端的用户列表更新
用户登出时验证状态同步
五、思考题
1.WebSocket连接建立时需要完成哪些握手过程?
WebSocket连接的建立需要经历以下握手过程:
客户端发起请求:客户端通过HTTP请求向服务器发起WebSocket连接请求,请求头中包含Upgrade: websocket和Connection: Upgrade,以及一个随机生成的Sec-WebSocket-Key。
服务器响应:服务器接收到请求后,验证请求头中的信息,并生成一个Sec-WebSocket-Accept值(通过对客户端的Sec-WebSocket-Key进行特定算法处理得到)。服务器返回一个HTTP响应,状态码为101 Switching Protocols,并包含Upgrade: websocket和Connection: Upgrade,以及Sec-WebSocket-Accept。
连接建立:客户端收到服务器的响应后,验证Sec-WebSocket-Accept值是否正确。如果验证通过,则WebSocket连接建立成功,双方可以通过WebSocket协议进行双向通信。
2. 为什么使用eventlet作为异步模式?对比gevent的异。
为什么使用eventlet:
兼容性:eventlet提供了对阻塞I/O操作的透明支持,能够自动将阻塞调用转换为非阻塞调用,适合在现有代码基础上实现异步化。
方便性:eventlet的API设计较为简洁,使用起来相对直观,适合快速开发异步程序。
eventlet与gevent的异同:
相同点:
都基于greenlet实现协程,通过事件循环来处理I/O操作,实现高效的并发处理。
都支持对阻塞I/O操作的协程化处理,能够提升程序的并发性能。
不同点:
API设计:eventlet的API设计更偏向于简洁和直观,而gevent的API设计相对更灵活,提供了更多的底层控制能力。
对阻塞操作的处理:eventlet会自动对阻塞I/O操作进行协程化,而gevent需要显式地对阻塞操作进行包装(如使用gevent.monkey.patch_all())。
社区和生态:gevent的社区相对更活跃,文档和资源更丰富,而eventlet的社区相对较小,但也有一定的用户群体。
3. socketio.run() 与标准 app.run() 有何本质区别?
app.run():
通常用于启动一个标准的Web应用(如Flask应用)。
主要处理HTTP请求和响应,不支持WebSocket协议。
是一个阻塞式的调用,会启动一个简单的开发服务器,适合开发和测试环境。
socketio.run():
用于启动一个支持WebSocket协议的服务器。
不仅可以处理HTTP请求,还可以处理WebSocket连接和消息通信。
内部集成了事件循环机制,支持异步处理WebSocket消息。
通常需要结合Flask-SocketIO等扩展使用,以实现WebSocket功能。
4. 广播消息时如何避免发送者收到重复消息?
在广播消息时,为了避免发送者收到重复消息,可以采取以下方法:
在发送广播消息时排除发送者:
在调用广播函数(如socketio.emit())时,通过skip_sid参数指定跳过发送者的会话ID(sid)。例如:
socketio.emit('message', data, broadcast=True, skip_sid=sender_sid)
这样可以确保消息不会被发送回发送者,避免重复接收。
5. 当前在线用户存储在内存中有何隐患?如何改进为Redis存储?
内存存储的隐患:
数据丢失:当服务器重启或崩溃时,内存中的数据会丢失,导致在线用户信息丢失。
可扩展性差:在多服务器部署的情况下,内存存储无法共享数据,每个服务器只能管理自己的在线用户信息。
资源限制:内存资源有限,随着用户数量的增加,可能会导致内存不足。
改进为Redis存储:
使用Redis存储在线用户信息:
将每个用户的会话信息(如sid、用户名等)存储在Redis中,使用键值对的形式存储。
可以通过Redis的SET命令存储用户信息,通过DEL命令删除用户信息。
import redisr = redis.Redis(host='localhost', port=6379, db=0)
def add_online_user(user_id, sid):
r.hset('online_users', user_id, sid)def remove_online_user(user_id):
r.hdel('online_users', user_id)def get_online_users():
return r.hgetall('online_users')
优点:
数据持久化:Redis支持数据持久化,即使服务器重启,数据也不会丢失。
可扩展性:Redis可以部署在多个服务器上,支持分布式存储,适合大规模应用。
性能高:Redis的读写速度非常快,适合存储在线用户信息。
6. 如何防止客户端发送恶意高频消息?
防止客户端发送恶意高频消息可以采取以下措施:
限制消息频率:
在服务器端设置消息频率限制,例如每秒最多接收一定数量的消息。
可以通过维护一个消息计数器和时间戳来实现频率限制。
示例代码:
from collections import defaultdict
from time import timemessage_limits = defaultdict(lambda: {'count': 0, 'timestamp': time()})
def handle_message(sid, data):
current_time = time()
if current_time - message_limits[sid]['timestamp'] < 1:
message_limits[sid]['count'] += 1
if message_limits[sid]['count'] > 10: # 每秒最多10条消息
return "Message frequency exceeded"
else:
message_limits[sid]['count'] = 1
message_limits[sid]['timestamp'] = current_time
# 处理消息
return "Message received"
使用令牌桶算法或漏桶算法:
令牌桶算法:允许在短时间内突发发送消息,但总体速率受限。
漏桶算法:严格限制消息的发送速率,平滑输出。
这些算法可以通过第三方库实现,也可以自己编写。
7. 消息事件中的 broadcast=True 参数有什么副作用?
broadcast=True 参数用于将消息广播给所有连接的客户端,但它可能会带来以下副作用:
性能问题:如果连接的客户端数量较多,广播消息会占用大量服务器资源,可能导致服务器性能下降。
消息过载:客户端可能会收到大量不必要的消息,导致客户端性能下降,甚至崩溃。
安全性问题:广播消息可能会将敏感信息发送给所有客户端,存在安全隐患。
为了避免这些问题,可以:
限制广播范围:通过room参数将消息广播限制在特定的房间内,而不是所有客户端。
优化消息内容:减少不必要的消息广播,只发送必要的信息。
分批广播:将消息分批发送,避免一次性发送大量消息。
8. 如何实现消息历史记录的持久化存储?
实现消息历史记录的持久化存储可以采用以下方法:
使用数据库存储:
将消息存储到关系型数据库(如MySQL、PostgreSQL)或非关系型数据库(如MongoDB、Redis)中。
每条消息可以存储为一条记录,包含消息内容、发送者、接收者、时间戳等信息。
示例代码(使用SQLite):
import sqlite3conn = sqlite3.connect('messages.db')
cursor = conn.cursor()cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
receiver TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')def save_message(sender, receiver, content):
cursor.execute('''
INSERT INTO messages (sender, receiver, content) VALUES (?, ?, ?)
''', (sender, receiver, content))
conn.commit()def get_message_history():
cursor.execute('SELECT * FROM messages')
return cursor.fetchall()
使用文件存储:
将消息以文本文件或JSON文件的形式存储到磁盘上。
适合轻量级应用,但不适合大规模数据存储。
使用消息队列:
将消息发送到消息队列(如RabbitMQ、Kafka)中,由其他服务负责处理和持久化存储。
适合高并发和分布式系统。
通过以上方法,可以实现消息历史记录的持久化存储,确保消息不会丢失,并支持后续的查询和分析。