实验六-使用PyMySQL数据存储的Flask登录系统-实验七-集成Flask-SocketIO的实时通信系统

发布于:2025-07-08 ⋅ 阅读:(21) ⋅ 点赞:(0)

实验六-使用PyMySQL数据存储的Flask登录系统

一、实验目的和任务

  1. 理解Web应用用户认证的基本流程和实现原理
  2. 掌握使用PyMySQL连接和操作MySQL数据库的方法
  3. 实现基于数据库的用户注册/登录功能
  4. 分析明文存储密码的安全隐患

二、实验内容

        PyMySQL基础操作:数据库连接池配置、游标对象的使用、SQL语句执行与结果处理

        Flask集成PyMySQL:应用上下文管理、请求生命周期中的连接处理、错误处理与事务回滚

        用户系统实现:注册功能、登录功能、用户查询功能

三、实验步骤

1. 数据库准备(db_init.sql)

/*
 * 实验六数据库初始化脚本 - Navicat版
 * 在Navicat中执行方法:
 * 1. 连接到目标MySQL服务器
 * 2. 点击"查询"->"新建查询"
 * 3. 复制本脚本内容到查询窗口
 * 4. 点击"运行"执行脚本
 */

-- 创建数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS flask_auth 
CHARACTER SET utf8mb4 
COLLATE utf8mb4_general_ci;

-- 使用数据库
USE flask_auth;

-- 创建用户表
CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '用户ID',
    username VARCHAR(50) UNIQUE NOT NULL COMMENT '用户名',
    password VARCHAR(100) NOT NULL COMMENT '密码(加密存储)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

-- 验证数据库和表是否创建成功
SELECT '数据库和表创建成功' AS 验证结果;

-- 查看表结构(执行此语句验证表是否创建正确)
SELECT 
    COLUMN_NAME AS '字段名',
    COLUMN_TYPE AS '数据类型',
    IS_NULLABLE AS '允许空',
    COLUMN_KEY AS '键',
    COLUMN_DEFAULT AS '默认值',
    EXTRA AS '额外信息',
    COLUMN_COMMENT AS '注释'
FROM 
    INFORMATION_SCHEMA.COLUMNS 
WHERE 
    TABLE_SCHEMA = 'flask_auth' 
    AND TABLE_NAME = 'users';

2. 修改app.py

from flask import Flask, render_template, request, redirect, send_from_directory, url_for, flash, g, session
from markupsafe import Markup
import pymysql
from werkzeug.security import generate_password_hash, check_password_hash

# Navicat执行初始化脚本说明:
# 1. 在Navicat中新建查询
# 2. 打开db_init.sql文件
# 3. 执行脚本创建数据库和表

import os

# 获取当前文件所在目录的绝对路径
base_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(base_dir, 'templates')

app = Flask(__name__, template_folder=template_dir)
app.secret_key = 'your_secret_key_here'#这个是用来加密session的,可以自己设置

# 添加模板文件夹检查
if not os.path.exists(template_dir):
    raise Exception(f"模板文件夹不存在: {template_dir}")
if not os.path.isfile(os.path.join(template_dir, 'login.html')):
    raise Exception("login.html 文件不存在")
if not os.path.isfile(os.path.join(template_dir, 'register.html')):
    raise Exception("register.html 文件不存在")

# 数据库配置 (请根据Navicat连接设置修改以下参数)
DB_CONFIG = {
    'host': 'localhost',      # Navicat连接的主机地址
    'user': 'root',           # Navicat连接的用户名
    'password': '123456',  # Navicat连接的密码
    'db': 'flask_auth',       # 在Navicat中创建的数据库名
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor,
    'port': 3306              # Navicat连接的端口号,默认3306
}
# 数据库连接函数
def get_db():
    if 'db' not in g:
        g.db = pymysql.connect(**DB_CONFIG)
    return g.db
# 应用关闭时关闭数据库连接
@app.teardown_appcontext
def teardown_db(exception):
    db = g.pop('db', None)
    if db is not None:
        db.close()
@app.route('/')
def index():
    if 'username' in session:
        return render_template('index.html', username=session['username'])
    return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
    print(f"Debug: 访问注册页面,方法: {request.method}")  # 添加调试信息
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        if not username or not password:
            flash('用户名和密码不能为空')
        else:
            db = get_db()
            try:
                with db.cursor() as cursor:
                    # 检查用户名是否已存在
                    cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
                    if cursor.fetchone():
                        flash('用户名已存在')
                    else:
                        # 密码哈希存储
                        hashed_password = generate_password_hash(password)
                        cursor.execute(
                            "INSERT INTO users (username, password) VALUES (%s, %s)",
                            (username, hashed_password)
                        )
                        db.commit()
                        flash('注册成功,请登录')
                        return redirect(url_for('login'))
            except Exception as e:
                db.rollback()
                flash('注册失败,请重试')
                app.logger.error(f"注册错误: {str(e)}")

    print(f"Debug: 注册模板路径: {os.path.join(app.template_folder, 'register.html')}")  # 添加模板路径调试
    return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        db = get_db()
        try:
            with db.cursor() as cursor:
                cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
                user = cursor.fetchone()
                
                if not user:
                    flash('用户名不存在')
                elif not check_password_hash(user['password'], password):
                    flash('密码错误')
                else:
                    session['username'] = username
                    flash('登录成功')
                    return redirect(url_for('index'))
        except Exception as e:
            flash('登录失败,请重试')
            app.logger.error(f"登录错误: {str(e)}")
    
    return render_template('login.html')

@app.route('/logout')
def logout():
    session.pop('username', None)
    return redirect(url_for('index'))

# 静态文件路由
@app.route('/static/<path:filename>')
def static_files(filename):
    return send_from_directory(os.path.join(app.root_path, 'static'), filename)

# favicon处理
@app.route('/favicon.ico')
def favicon():
    return '', 204  # 返回空内容避免404错误

# 优化错误处理
@app.errorhandler(404)
def page_not_found(e):
    return render_template('404.html'), 404

@app.errorhandler(500)
def internal_error(e):
    return render_template('500.html'), 500

if __name__ == '__main__':
    # 启用详细调试信息
    app.config['TRAP_HTTP_EXCEPTIONS'] = True
    app.config['EXPLAIN_TEMPLATE_LOADING'] = True
    
    # 打印所有路由信息
    print("可用路由:")
    for rule in app.url_map.iter_rules():
        print(f"{rule.endpoint}: {rule.rule}")
    
    # 打印模板配置信息
    print(f"\n模板文件夹: {app.template_folder}")
    print(f"静态文件夹: {app.static_folder}")
    
    app.run(debug=True, host='0.0.0.0', port=5000)

3. 其他前端界面参照实验五


4. 功能测试流程

1.注册新用户:访问/register页面、提交用户名和密码(如:test/123456)

2.数据库验证,确认密码以明文形式存储:

3.登录测试:访问/login页面、使用相同凭证登录、验证登录是否成功

四、思考题

1.示例代码中为什么使用参数化查询(%s)而不是字符串拼接?

技术原理:
查询与数据分离处理机制
预编译语句(PreparedStatement)实现
自动类型安全检测
安全对比实验:
# 安全示例(参数化)
cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", 
              (username, pwd_hash))

# 危险示例(拼接) - 易受SQL注入攻击
sql = f"SELECT * FROM users WHERE username='{username}' AND password='{pwd_hash}'"
cursor.execute(sql)  # 当username输入"admin'--"时会产生漏洞

2.PyMySQL的cursorclass参数有什么作用?

游标类型返回格式内存占用适用场景
Cursor 元组 低 简单查询 
DictCursor 字典 中 API开发 
SSCursor 元组(流式) 极低 大数据量 
SSDictCursor 字典(流式) 低 大数据量API 
配置示例:
conn = pymysql.connect(
    cursorclass=pymysql.cursors.SSDictCursor,  # 推荐生产环境使用
    host='mysql.example.com',
    user='app_user',
    password='encrypted_password',
    db='app_db',
    charset='utf8mb4'
)

3.什么情况下需要手动调用commit()?

必须手动提交的场景:
数据变更操作(DML)后
多步骤事务完成时
重要业务操作完成后
事务模板:
try:
    conn.begin()  # 显式开始事务
    with conn.cursor() as cursor:
        # 操作1:扣减库存
        cursor.execute("""
            UPDATE products SET stock=stock-%s 
            WHERE id=%s AND stock>=%s
            """, (quantity, product_id, quantity))
        
        # 操作2:创建订单
        cursor.execute("""
            INSERT INTO orders VALUES 
            (NULL, %s, %s, %s, NOW())
            """, (user_id, product_id, quantity))
    
    conn.commit()  # 显式提交
except Exception as e:
    conn.rollback()  # 失败回滚
    logger.error(f"Transaction failed: {str(e)}")
    raise
finally:
    conn.close()  # 确保连接关闭

4.PyMySQL执行INSERT和SELECT语句时有哪些安全注意事项?

INSERT安全规范:
使用参数化查询
验证数据完整性
处理唯一约束异常
限制批量插入数量
SELECT防御措施:
# 安全的分页查询实现
def get_paginated_data(page, size):
    size = min(100, max(1, size))  # 限制每页最大100条
    offset = (max(1, page) - 1) * size
    with conn.cursor() as cursor:
        cursor.execute("""
            SELECT id, name FROM products
            WHERE status=1
            ORDER BY create_time DESC
            LIMIT %s OFFSET %s
            """, (size, offset))
        return cursor.fetchall()

5.为什么要在except块中调用rollback()?

关键作用:
保证事务原子性
避免脏数据残留
释放数据库锁资源
维持连接池健康
错误处理模式:
try:
    # 业务操作...
except pymysql.err.IntegrityError:
    con

实验七-集成Flask-SocketIO的实时通信系统

一、实验目的和任务

理解WebSocket通信协议与HTTP协议的区别

掌握Flask-SocketIO的集成与实时通信实现

实现用户在线状态实时同步与消息推送功能

实验任务

在实验六代码基础上(登录注册基础上)集成Socket.IO功能

实现以下核心功能:

实时聊天消息广播

动态在线用户列表

用户登录/登出状态通知

二、实验内容

新增技术组件

Flask-SocketIO服务端集成

Socket.IO客户端开发

双向事件通信机制(emit/on)

会话状态维持与广播机制

前后端实时数据同步

三、实验步骤

1. 环境准备

# 安装新依赖

pip install flask-socketio eventlet

2. 修改工程结构

├── app.py                 # 主程序(升级)

├── static

│   ├── style.css          # 原样式文件

│   └── chat.js            # 新增Socket.IO客户端逻辑

└── templates

    ├── home.html          # 升级聊天界面

    ├── login.html         # 原登录页面

    └── register.html      # 原注册页面

3. 升级app.py (服务端核心修改)

import eventlet
eventlet.monkey_patch()

from flask import Flask, render_template, request, redirect, send_from_directory, url_for, flash, g, session
from markupsafe import Markup
import pymysql
from werkzeug.security import generate_password_hash, check_password_hash
from flask_socketio import SocketIO, emit, join_room
from datetime import datetime
import os

# 获取当前文件所在目录的绝对路径
base_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(base_dir, 'templates')

app = Flask(__name__, template_folder=template_dir)
app.secret_key = 'your_secret_key_here'  # 这个是用来加密session的,可以自己设置

# Socket.IO配置
socketio = SocketIO(app, async_mode='eventlet', cors_allowed_origins="*")

# 在线用户管理
online_users = set()

# 添加模板文件夹检查
if not os.path.exists(template_dir):
    raise Exception(f"模板文件夹不存在: {template_dir}")
if not os.path.isfile(os.path.join(template_dir, 'login.html')):
    raise Exception("login.html 文件不存在")
if not os.path.isfile(os.path.join(template_dir, 'register.html')):
    raise Exception("register.html 文件不存在")

# 数据库配置 (请根据Navicat连接设置修改以下参数)
DB_CONFIG = {
    'host': 'localhost',      # Navicat连接的主机地址
    'user': 'root',           # Navicat连接的用户名
    'password': '123456',     # Navicat连接的密码
    'db': 'fyt',              # 在Navicat中创建的数据库名
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor,
    'port': 3306              # Navicat连接的端口号,默认3306
}

# 数据库连接函数
def get_db():
    if 'db' not in g:
        g.db = pymysql.connect(**DB_CONFIG)
    return g.db

# 应用关闭时关闭数据库连接
@app.teardown_appcontext
def teardown_db(exception):
    db = g.pop('db', None)
    if db is not None:
        db.close()

@app.route('/')
def index():
    if 'username' in session:
        current_user = {'username': session['username'], 'is_admin': False}  # 根据你的实际情况定义 is_admin 等属性
        return render_template('index.html', username=session['username'], current_user=current_user)
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register():
    print(f"Debug: 访问注册页面,方法: {request.method}")  # 添加调试信息
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        if not username or not password:
            flash('用户名和密码不能为空')
        else:
            db = get_db()
            try:
                with db.cursor() as cursor:
                    # 检查用户名是否已存在
                    cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
                    if cursor.fetchone():
                        flash('用户名已存在')
                    else:
                        # 密码哈希存储
                        hashed_password = generate_password_hash(password)
                        cursor.execute(
                            "INSERT INTO users (username, password) VALUES (%s, %s)",
                            (username, hashed_password)
                        )
                        db.commit()
                        flash('注册成功,请登录')
                        return redirect(url_for('login'))
            except Exception as e:
                db.rollback()
                flash('注册失败,请重试')
                app.logger.error(f"注册错误: {str(e)}")

    print(f"Debug: 注册模板路径: {os.path.join(app.template_folder, 'register.html')}")  # 添加模板路径调试
    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        db = get_db()
        try:
            with db.cursor() as cursor:
                cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
                user = cursor.fetchone()
                
                if not user:
                    flash('用户名不存在')
                elif not check_password_hash(user['password'], password):
                    flash('密码错误')
                else:
                    session['username'] = username
                    flash('登录成功')
                    return redirect(url_for('index'))
        except Exception as e:
            flash('登录失败,请重试')
            app.logger.error(f"登录错误: {str(e)}")
    
    return render_template('login.html')

# 静态文件路由
@app.route('/static/<path:filename>')
def static_files(filename):
    return send_from_directory(os.path.join(app.root_path, 'static'), filename)

# favicon处理
@app.route('/favicon.ico')
def favicon():
    return '', 204  # 返回空内容避免404错误

# 优化错误处理
@app.errorhandler(404)
def page_not_found(e):
    return render_template('404.html'), 404

@app.errorhandler(500)
def internal_error(e):
    return render_template('500.html'), 500

# 房间状态管理
active_rooms = {}  # {room_id: {'name': room_name, 'users': [username1, ...]}}
private_chats = set()  # 存储私信会话 {(user1, user2)}

# Socket.IO事件处理
@socketio.on('connect')
def handle_connect():
    if 'username' in session:
        username = session['username']
        online_users.add(username)
        emit('user_join', {'username': username, 'time': datetime.now().strftime('%H:%M:%S')}, broadcast=True)
        emit('online_users', {'users': list(online_users)}, broadcast=True)
        emit('room_list', {'rooms': active_rooms})  # 发送当前房间列表

@socketio.on('disconnect')
def handle_disconnect():
    if 'username' in session:
        username = session['username']
        if username in online_users:
            online_users.remove(username)
            # 离开所有房间
            for room_id, room in active_rooms.items():
                if username in room['users']:
                    room['users'].remove(username)
                    emit('user_left_room', {'room_id': room_id, 'username': username}, room=room_id)
            emit('user_leave', {'username': username, 'time': datetime.now().strftime('%H:%M:%S')}, broadcast=True)
            emit('online_users', {'users': list(online_users)}, broadcast=True)

@socketio.on('message')
def handle_message(data):
    if 'username' in session:
        username = session['username']
        message_data = {
            'username': username,
            'message': data['message'],
            'time': datetime.now().strftime('%H:%M:%S')
        }
        
        # 判断消息类型
        if 'room_id' in data:  # 房间消息
            room_id = data['room_id']
            if room_id in active_rooms and username in active_rooms[room_id]['users']:
                emit('new_room_message', {**message_data, 'room_id': room_id}, room=room_id)
        elif 'target_user' in data:  # 私信
            target_user = data['target_user']
            if target_user in online_users:
                emit('new_private_message', {**message_data, 'target_user': target_user}, 
                     room=f"{username}_{target_user}" if f"{username}_{target_user}" in private_chats 
                     else f"{target_user}_{username}")
        else:  # 公共消息
            emit('new_message', message_data, broadcast=True)

# 房间管理事件
@socketio.on('create_room')
def handle_create_room(data):
    if 'username' in session:
        username = session['username']
        room_name = data.get('room_name', '新房间')
        
        # 在实际应用中应该使用数据库生成room_id
        room_id = len(active_rooms) + 1
        active_rooms[room_id] = {
            'name': room_name,
            'creator': username,
            'users': [username]
        }
        join_room(str(room_id))
        emit('room_created', {
            'room_id': room_id,
            'room_name': room_name,
            'creator': username
        }, broadcast=True)

@socketio.on('join_room')
def handle_join_room(data):
    if 'username' in session and 'room_id' in data:
        username = session['username']
        room_id = data['room_id']
        
        if room_id in active_rooms and username not in active_rooms[room_id]['users']:
            active_rooms[room_id]['users'].append(username)
            join_room(str(room_id))
            emit('user_joined_room', {
                'room_id': room_id,
                'username': username
            }, room=room_id)

@socketio.on('leave_room')
def handle_leave_room(data):
    if 'username' in session and 'room_id' in data:
        username = session['username']
        room_id = data['room_id']
        
        if room_id in active_rooms and username in active_rooms[room_id]['users']:
            active_rooms[room_id]['users'].remove(username)
            leave_room(str(room_id))
            emit('user_left_room', {
                'room_id': room_id,
                'username': username
            }, room=room_id)

# 私信功能
@socketio.on('start_private_chat')
def handle_start_private_chat(data):
    if 'username' in session and 'target_user' in data:
        username = session['username']
        target_user = data['target_user']
        
        if target_user in online_users:
            chat_id = f"{username}_{target_user}" if username < target_user else f"{target_user}_{username}"
            private_chats.add(chat_id)
            join_room(chat_id)
            emit('private_chat_started', {
                'target_user': target_user,
                'initiator': username
            }, room=chat_id)

# 修改登出路由以处理Socket.IO连接
@app.route('/logout')
def logout():
    if 'username' in session:
        username = session.pop('username', None)
        if username in online_users:
            online_users.remove(username)
            socketio.emit('user_leave', {'username': username, 'time': datetime.now().strftime('%H:%M:%S')}, broadcast=True)
            socketio.emit('online_users', {'users': list(online_users)}, broadcast=True)
    return redirect(url_for('index'))

if __name__ == '__main__':
    # 启用详细调试信息
    app.config['TRAP_HTTP_EXCEPTIONS'] = True
    app.config['EXPLAIN_TEMPLATE_LOADING'] = True
    
    # 打印所有路由信息
    print("可用路由:")
    for rule in app.url_map.iter_rules():
        print(f"{rule.endpoint}: {rule.rule}")
    
    # 打印模板配置信息
    print(f"\n模板文件夹: {app.template_folder}")
    print(f"静态文件夹: {app.static_folder}")
    
    socketio.run(app, debug=True, host='127.0.0.1', port=5001)

4. 新增chat.js (客户端通信逻辑)

console.log("chat.js已加载");
const socket = io();

// 当前房间和私信状态
let currentRoom = null;
let privateChats = {};

// 调试Socket连接状态
socket.on('connect', () => {
    console.log('已连接到Socket.IO服务器');
});

socket.on('disconnect', () => {
    console.log('已断开与Socket.IO服务器的连接');
});

// 消息显示函数
function displayMessage(data, type = 'public') {
    const chatArea = document.getElementById('chat-area');
    const messageElement = document.createElement('div');
    
    // 设置不同消息类型的样式
    messageElement.className = `message ${type}-message`;
    
    // 根据消息类型构建内容
    let content = '';
    if (type === 'room') {
        content = `[房间 ${data.room_id}] <span class="username">${data.username}</span>`;
    } else if (type === 'private') {
        content = `[私信] <span class="username">${data.username}</span>`;
    } else {
        content = `<span class="username">${data.username}</span>`;
    }
    
    messageElement.innerHTML = `
        ${content}
        <span class="time">${data.time}</span>
        <div class="content">${data.message}</div>
    `;
    chatArea.appendChild(messageElement);
    chatArea.scrollTop = chatArea.scrollHeight;
}

// 房间相关事件
socket.on('room_created', (room) => {
    displayMessage({
        username: '系统',
        message: `房间 "${room.room_name}" 已创建`,
        time: new Date().toLocaleTimeString()
    }, 'system');
    updateRoomList();
});

// 房间消息处理
socket.on('new_room_message', (data) => {
    displayMessage(data, 'room');
});

socket.on('user_joined_room', (data) => {
    displayMessage({
        username: '系统',
        message: `${data.username} 加入了房间`,
        time: new Date().toLocaleTimeString()
    }, 'system');
});

socket.on('user_left_room', (data) => {
    displayMessage({
        username: '系统',
        message: `${data.username} 离开了房间`,
        time: new Date().toLocaleTimeString()
    }, 'system');
});

// 更新房间列表
function updateRoomList() {
    socket.emit('get_rooms');
}

socket.on('room_list', (data) => {
    const roomList = document.getElementById('rooms');
    roomList.innerHTML = '';
    data.rooms.forEach(room => {
        const roomItem = document.createElement('li');
        if(currentRoom === room.id) {
            roomItem.classList.add('current-room');
            roomItem.innerHTML = `
                <span class="room-name"><strong>${room.name}</strong> (${room.users.length}人) [当前房间]</span>
                <button class="leave-room" data-id="${room.id}">离开房间</button>
            `;
        } else {
            roomItem.innerHTML = `
                <span class="room-name">${room.name} (${room.users.length}人)</span>
                <button class="join-room" data-id="${room.id}">加入房间</button>
            `;
        }
        roomList.appendChild(roomItem);
    });
});

// 添加按钮加载状态
function setButtonLoading(button, isLoading) {
    if(isLoading) {
        button.disabled = true;
        button.innerHTML = button.innerHTML.replace(/>([^<]+)</, '>处理中...<');
    } else {
        button.disabled = false;
        if(button.classList.contains('join-room')) {
            button.innerHTML = button.innerHTML.replace(/>([^<]+)</, '>加入房间<');
        } else if(button.classList.contains('leave-room')) {
            button.innerHTML = button.innerHTML.replace(/>([^<]+)</, '>离开房间<');
        }
    }
}

// 私信功能
function openPrivateChat(user) {
    if (!privateChats[user]) {
        const pmWindow = document.createElement('div');
        pmWindow.className = 'pm-window';
        pmWindow.dataset.target = user;
        pmWindow.innerHTML = `
            <div class="pm-header">与 ${user} 的私信</div>
            <div class="pm-messages"></div>
            <input type="text" class="pm-input" placeholder="输入私信...">
            <button class="pm-send">发送</button>
        `;
        document.getElementById('pm-windows').appendChild(pmWindow);
        privateChats[user] = true;
        socket.emit('start_private_chat', { target_user: user });
    }
}

// 事件监听
document.addEventListener('click', (e) => {
    // 创建房间
    if (e.target.id === 'create-room-button') {
        const roomName = document.getElementById('room-name-input').value.trim();
        if (roomName) {
            socket.emit('create_room', { room_name: roomName });
            document.getElementById('room-name-input').value = '';
            // 创建房间后延迟更新列表
            setTimeout(updateRoomList, 300);
        }
    }
    
    // 加入房间
    if (e.target.classList.contains('join-room')) {
        const roomId = e.target.dataset.id;
        socket.emit('join_room', { room_id: roomId });
        currentRoom = roomId;
        // 加入房间后延迟更新列表
        setTimeout(updateRoomList, 300);
    }
    
    // 离开房间
    if (e.target.classList.contains('leave-room')) {
        const roomId = e.target.dataset.id;
        socket.emit('leave_room', { room_id: roomId }, (response) => {
            if (response.success) {
                currentRoom = null;
                displayMessage({
                    username: '系统',
                    message: `你已离开房间`,
                    time: new Date().toLocaleTimeString()
                }, 'system');
                updateRoomList();
            } else {
                displayMessage({
                    username: '系统',
                    message: `离开房间失败: ${response.error}`,
                    time: new Date().toLocaleTimeString()
                }, 'system');
            }
        });
    }
    
    // 私信
    if (e.target.classList.contains('pm-btn')) {
        const user = e.target.dataset.user;
        openPrivateChat(user);
    }
});

// 发送消息逻辑修改
document.getElementById('send-button').addEventListener('click', sendMessage);
document.getElementById('message-input').addEventListener('keypress', (e) => {
    if (e.key === 'Enter') sendMessage();
});

function sendMessage() {
    const messageInput = document.getElementById('message-input');
    const message = messageInput.value.trim();
    if (message) {
        if (currentRoom) {
            socket.emit('message', { 
                message, 
                room_id: currentRoom 
            });
        } else {
            socket.emit('message', { message });
        }
        messageInput.value = '';
    }
}

// 初始化
updateRoomList();

6. 新增登出路由(需同步更新用户状态)

@app.route('/logout')

def logout():

    """处理用户登出"""

    username = session.pop('username',  None)

    if username in online_users:

        online_users.remove(username)

        socketio.emit('user_leave',  {'username': username}, broadcast=True)

    flash('您已成功登出')

    return redirect(url_for('login'))

四、功能测试流程

基础通信测试

打开两个浏览器窗口分别登录不同账号

验证消息实时同步显示

检查控制台错误日志(目前无错误)

状态同步测试(失败)

新用户登录时观察其他客户端的用户列表更新

用户登出时验证状态同步

五、思考题

1.WebSocket连接建立时需要完成哪些握手过程?

WebSocket连接的建立需要经历以下握手过程:
客户端发起请求:客户端通过HTTP请求向服务器发起WebSocket连接请求,请求头中包含Upgrade: websocket和Connection: Upgrade,以及一个随机生成的Sec-WebSocket-Key。
服务器响应:服务器接收到请求后,验证请求头中的信息,并生成一个Sec-WebSocket-Accept值(通过对客户端的Sec-WebSocket-Key进行特定算法处理得到)。服务器返回一个HTTP响应,状态码为101 Switching Protocols,并包含Upgrade: websocket和Connection: Upgrade,以及Sec-WebSocket-Accept。
连接建立:客户端收到服务器的响应后,验证Sec-WebSocket-Accept值是否正确。如果验证通过,则WebSocket连接建立成功,双方可以通过WebSocket协议进行双向通信。

2. 为什么使用eventlet作为异步模式?对比gevent的异。

为什么使用eventlet:
兼容性:eventlet提供了对阻塞I/O操作的透明支持,能够自动将阻塞调用转换为非阻塞调用,适合在现有代码基础上实现异步化。
方便性:eventlet的API设计较为简洁,使用起来相对直观,适合快速开发异步程序。
eventlet与gevent的异同:
相同点:
都基于greenlet实现协程,通过事件循环来处理I/O操作,实现高效的并发处理。
都支持对阻塞I/O操作的协程化处理,能够提升程序的并发性能。
不同点:
API设计:eventlet的API设计更偏向于简洁和直观,而gevent的API设计相对更灵活,提供了更多的底层控制能力。
对阻塞操作的处理:eventlet会自动对阻塞I/O操作进行协程化,而gevent需要显式地对阻塞操作进行包装(如使用gevent.monkey.patch_all())。
社区和生态:gevent的社区相对更活跃,文档和资源更丰富,而eventlet的社区相对较小,但也有一定的用户群体。

3. socketio.run() 与标准 app.run() 有何本质区别?

app.run():
通常用于启动一个标准的Web应用(如Flask应用)。
主要处理HTTP请求和响应,不支持WebSocket协议。
是一个阻塞式的调用,会启动一个简单的开发服务器,适合开发和测试环境。
socketio.run():
用于启动一个支持WebSocket协议的服务器。
不仅可以处理HTTP请求,还可以处理WebSocket连接和消息通信。
内部集成了事件循环机制,支持异步处理WebSocket消息。
通常需要结合Flask-SocketIO等扩展使用,以实现WebSocket功能。

4. 广播消息时如何避免发送者收到重复消息?

在广播消息时,为了避免发送者收到重复消息,可以采取以下方法:
在发送广播消息时排除发送者:
在调用广播函数(如socketio.emit())时,通过skip_sid参数指定跳过发送者的会话ID(sid)。例如:
socketio.emit('message', data, broadcast=True, skip_sid=sender_sid)
这样可以确保消息不会被发送回发送者,避免重复接收。

5. 当前在线用户存储在内存中有何隐患?如何改进为Redis存储?

内存存储的隐患:
数据丢失:当服务器重启或崩溃时,内存中的数据会丢失,导致在线用户信息丢失。
可扩展性差:在多服务器部署的情况下,内存存储无法共享数据,每个服务器只能管理自己的在线用户信息。
资源限制:内存资源有限,随着用户数量的增加,可能会导致内存不足。
改进为Redis存储:
使用Redis存储在线用户信息:
将每个用户的会话信息(如sid、用户名等)存储在Redis中,使用键值对的形式存储。
可以通过Redis的SET命令存储用户信息,通过DEL命令删除用户信息。
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def add_online_user(user_id, sid):
    r.hset('online_users', user_id, sid)

def remove_online_user(user_id):
    r.hdel('online_users', user_id)

def get_online_users():
    return r.hgetall('online_users')
优点:
数据持久化:Redis支持数据持久化,即使服务器重启,数据也不会丢失。
可扩展性:Redis可以部署在多个服务器上,支持分布式存储,适合大规模应用。
性能高:Redis的读写速度非常快,适合存储在线用户信息。

6. 如何防止客户端发送恶意高频消息?

防止客户端发送恶意高频消息可以采取以下措施:
限制消息频率:
在服务器端设置消息频率限制,例如每秒最多接收一定数量的消息。
可以通过维护一个消息计数器和时间戳来实现频率限制。
示例代码:
from collections import defaultdict
from time import time

message_limits = defaultdict(lambda: {'count': 0, 'timestamp': time()})

def handle_message(sid, data):
    current_time = time()
    if current_time - message_limits[sid]['timestamp'] < 1:
        message_limits[sid]['count'] += 1
        if message_limits[sid]['count'] > 10:  # 每秒最多10条消息
            return "Message frequency exceeded"
    else:
        message_limits[sid]['count'] = 1
        message_limits[sid]['timestamp'] = current_time
    # 处理消息
    return "Message received"
使用令牌桶算法或漏桶算法:
令牌桶算法:允许在短时间内突发发送消息,但总体速率受限。
漏桶算法:严格限制消息的发送速率,平滑输出。
这些算法可以通过第三方库实现,也可以自己编写。

7. 消息事件中的 broadcast=True 参数有什么副作用?

broadcast=True 参数用于将消息广播给所有连接的客户端,但它可能会带来以下副作用:
性能问题:如果连接的客户端数量较多,广播消息会占用大量服务器资源,可能导致服务器性能下降。
消息过载:客户端可能会收到大量不必要的消息,导致客户端性能下降,甚至崩溃。
安全性问题:广播消息可能会将敏感信息发送给所有客户端,存在安全隐患。
为了避免这些问题,可以:
限制广播范围:通过room参数将消息广播限制在特定的房间内,而不是所有客户端。
优化消息内容:减少不必要的消息广播,只发送必要的信息。
分批广播:将消息分批发送,避免一次性发送大量消息。

8. 如何实现消息历史记录的持久化存储?

实现消息历史记录的持久化存储可以采用以下方法:
使用数据库存储:
将消息存储到关系型数据库(如MySQL、PostgreSQL)或非关系型数据库(如MongoDB、Redis)中。
每条消息可以存储为一条记录,包含消息内容、发送者、接收者、时间戳等信息。
示例代码(使用SQLite):
import sqlite3

conn = sqlite3.connect('messages.db')
cursor = conn.cursor()

cursor.execute('''
    CREATE TABLE IF NOT EXISTS messages (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        sender TEXT NOT NULL,
        receiver TEXT NOT NULL,
        content TEXT NOT NULL,
        timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
    )
''')

def save_message(sender, receiver, content):
    cursor.execute('''
        INSERT INTO messages (sender, receiver, content) VALUES (?, ?, ?)
    ''', (sender, receiver, content))
    conn.commit()

def get_message_history():
    cursor.execute('SELECT * FROM messages')
    return cursor.fetchall()
使用文件存储:
将消息以文本文件或JSON文件的形式存储到磁盘上。
适合轻量级应用,但不适合大规模数据存储。
使用消息队列:
将消息发送到消息队列(如RabbitMQ、Kafka)中,由其他服务负责处理和持久化存储。
适合高并发和分布式系统。
通过以上方法,可以实现消息历史记录的持久化存储,确保消息不会丢失,并支持后续的查询和分析。

实验完整代码以及资源:

#Python网络编程实验资源1-11完整版(含开发文档,开发代码,开发内容,每一个实验总结以及分类)资源-CSDN下载https://download.csdn.net/download/m0_73951999/91045318


网站公告

今日签到

点亮在社区的每一天
去签到