从零到一:使用Flask构建“我的笔记”网站

发布于:2025-08-31 ⋅ 阅读:(18) ⋅ 点赞:(0)

在当今信息爆炸的时代,一个方便快捷的笔记管理系统对于学习和工作都至关重要。今天,小编将详细介绍如何使用Python的Flask框架构建一个功能完整的笔记网站——“我的笔记”。这个项目不仅包含了基本的CRUD(增删改查)功能,还实现了用户权限管理、Markdown支持、响应式设计等高级特性。

本教程适合有一定Python基础并希望学习Web开发的开发者。通过完成这个项目,你将掌握Flask开发的核心概念、前端与后端的交互方式以及Web应用部署的相关知识。

项目概述

功能特性

我们的"我的笔记"网站具备以下核心功能:

  1. 用户管理系统:注册、登录、权限控制和用户等级系统
  2. 笔记管理:创建、编辑、删除和查看笔记
  3. 评论系统:用户可以对笔记发表评论
  4. 管理员功能:用户管理、内容审核和反馈处理
  5. Markdown支持:使用Markdown语法编写富文本内容
  6. 响应式设计:适配桌面和移动设备
  7. 用户等级系统:根据用户活跃度显示不同等级标识

技术栈

  • 后端框架:Flask (Python)
  • 前端技术:HTML5, CSS3, JavaScript, Bootstrap 5
  • 数据存储:JSON文件(也可轻松迁移到数据库)
  • 模板引擎:Jinja2
  • 加密技术:SHA-256密码加密

环境搭建与项目结构

安装必要的库

首先,确保你已安装Python 3.7+,然后安装所需的依赖库:

pip3 install flask

项目结构

创建以下项目结构:

my_notes/
├── app.py              # 主应用文件
├── notes.json          # 笔记数据文件(自动生成)
├── users.json          # 用户数据文件(自动生成)
├── feedbacks.json      # 反馈数据文件(自动生成)
└── templates/          # 模板目录
    ├── base.html       # 基础模板
    ├── index.html      # 首页
    ├── login.html      # 登录页
    ├── register.html   # 注册页
    ├── view_note.html  # 笔记详情页
    ├── edit_note.html  # 编辑笔记页
    ├── admin.html      # 管理员面板
    ├── admin_feedbacks.html # 反馈管理页
    ├── feedback.html   # 意见反馈页
    ├── delete_account.html # 账户注销页
    ├── 404.html        # 404错误页
    └── 500.html        # 500错误页

核心代码解析

1. Flask应用初始化

让我们从Flask应用的初始化开始,这是整个项目的基础:

from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session
import json
import os
import re
import hashlib
from datetime import datetime
from functools import wraps

# 初始化Flask应用
app = Flask(__name__, template_folder=TEMPLATE_DIR)
app.secret_key = 'my_secret_key'  # 用于会话加密

代码解析

  • 导入必要的Flask模块和其他辅助库
  • secret_key用于加密会话数据,在生产环境中应使用更复杂的密钥
  • 设置模板文件夹路径,确保模板文件能被正确加载

2. 数据管理类设计

我们设计了三个核心管理类来处理不同的数据实体:

UserManager - 用户管理
class UserManager:
class UserManager:
    """ 用户管理器 """
    def __init__(self):
        self.users_file = USERS_FILE
        self.users = self.load_users()

    def load_users(self):
        """ 加载用户数据 """
        if os.path.exists(self.users_file):
            with open(self.users_file, 'r', encoding='utf-8') as f:
                try:
                    users = json.load(f)
                    # 确保每个用户都有is_admin字段
                    for user in users:
                        if 'is_admin' not in user:
                            user['is_admin'] = False
                        if 'is_banned' not in user:
                            user['is_banned'] = False
                    return users
                except json.JSONDecodeError:
                    return []
        return []

    def save_users(self):
        """ 保存用户数据 """
        os.makedirs(os.path.dirname(self.users_file), exist_ok=True)
        with open(self.users_file, 'w', encoding='utf-8') as f:
            json.dump(self.users, f, indent=2, ensure_ascii=False)

    def register_user(self, username, password, nickname):
        """ 注册新用户 """
        # 检查用户名是否已存在
        if any(user['username'] == username for user in self.users):
            return False, "用户名已存在"

        # 检查昵称是否已存在
        if any(user['nickname'] == nickname for user in self.users):
            return False, "昵称已存在"

        # 创建新用户
        new_user = {
            "id": len(self.users) + 1,
            "username": username,
            "password": hash_password(password),
            "nickname": nickname,
            "is_admin": False,  # 默认不是管理员
            "is_banned": False,  # 默认未被封禁
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

        self.users.append(new_user)
        self.save_users()
        return True, "注册成功"

    def make_admin(self, username):
        """ 将用户设为管理员 """
        user = next((u for u in self.users if u['username'] == username), None)
        if user:
            user['is_admin'] = True
            self.save_users()
            return True
        return False

    def ban_user(self, user_id):
        """ 封禁用户 """
        user = self.get_user_by_id(user_id)
        if user and not user['is_admin']:  # 不能封禁管理员
            user['is_banned'] = True
            self.save_users()
            return True
        return False

    def unban_user(self, user_id):
        """ 解封用户 """
        user = self.get_user_by_id(user_id)
        if user:
            user['is_banned'] = False
            self.save_users()
            return True
        return False

    def authenticate_user(self, username, password):
        """ 验证用户 """
        user = next((u for u in self.users if u['username'] == username), None)
        if user and user['password'] == hash_password(password):
            return user
        return None

    def get_user_by_id(self, user_id):
        """ 通过ID获取用户 """
        return next((u for u in self.users if u['id'] == user_id), None)

    def get_user_by_nickname(self, nickname):
        """ 通过昵称获取用户 """
        return next((u for u in self.users if u['nickname'] == nickname), None)

    def is_user_banned(self, nickname):
        """ 检查用户是否被封禁 """
        user = self.get_user_by_nickname(nickname)
        return user and user.get('is_banned', False)

    def delete_user(self, user_id):
        """删除用户账号"""
        user = self.get_user_by_id(user_id)
        if user and not user.get('is_admin', False):  # 不能删除管理员
            # 从用户列表中移除
            self.users = [u for u in self.users if u['id'] != user_id]
            self.save_users()
            return True
        return False

设计思路

  • 使用JSON文件作为数据存储,简化开发流程
  • 通过类封装用户相关的所有操作,提高代码可维护性
  • 包含用户验证、权限管理和状态管理功能
NoteManager - 笔记管理
class NoteManager:
    """ 笔记管理器 """
    def __init__(self):
        self.notes_file = NOTES_FILE
        self.notes = self.load_notes()

    def load_notes(self):
        """ 加载笔记数据 """
        if os.path.exists(self.notes_file):
            with open(self.notes_file, 'r', encoding='utf-8') as f:
                try:
                    notes = json.load(f)
                    # 确保每个笔记都有comments字段和author字段
                    for note in notes:
                        if 'comments' not in note:
                            note['comments'] = []
                        if 'author' not in note:
                            note['author'] = '未知用户'
                    return notes
                except json.JSONDecodeError:
                    return []
        return []

    def save_notes(self):
        """ 保存笔记数据 """
        os.makedirs(os.path.dirname(self.notes_file), exist_ok=True)
        with open(self.notes_file, 'w', encoding='utf-8') as f:
            json.dump(self.notes, f, indent=2, ensure_ascii=False)

    def add_note(self, title, content, author):
        """ 添加新笔记 """
        max_id = max([note['id'] for note in self.notes]) if self.notes else 0
        new_note = {
            "id": max_id + 1,
            "title": title,
            "content": content,
            "author": author,
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "history": [],
            "comments": []
        }
        self.notes.append(new_note)
        self.save_notes()
        return new_note

    def edit_note(self, note_id, new_title, new_content):
        """ 修改笔记 """
        note = self.find_note(note_id)
        if not note:
            return False

        note['history'].append({
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "title": note['title'],
            "content": note['content']
        })

        note['title'] = new_title
        note['content'] = new_content
        note['updated_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.save_notes()
        return True

    def find_note(self, note_id):
        """ 查找笔记 """
        try:
            note_id = int(note_id)
            return next((n for n in self.notes if n['id'] == note_id), None)
        except (ValueError, TypeError):
            return None

    def delete_note(self, note_id):
        """ 删除笔记 """
        note = self.find_note(note_id)
        if not note:
            return False
        self.notes.remove(note)
        self.save_notes()
        return True

    def add_comment(self, note_id, author, content):
        """ 添加评论 """
        note = self.find_note(note_id)
        if not note:
            return False

        new_comment = {
            "id": len(note['comments']) + 1,
            "author": author,
            "content": content,
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

        note['comments'].append(new_comment)
        self.save_notes()
        return True

    def delete_comment(self, note_id, comment_id):
        """ 删除评论 """
        note = self.find_note(note_id)
        if not note:
            return False

        comment = next((c for c in note['comments'] if c['id'] == comment_id), None)
        if comment:
            note['comments'].remove(comment)
            self.save_notes()
            return True
        return False

    def get_user_note_count(self, nickname):
        """ 获取用户的笔记数量 """
        return len([note for note in self.notes if note['author'] == nickname])

    def get_visible_notes(self, user_manager):
        """ 获取可见的笔记(过滤被封禁用户的内容) """
        visible_notes = []
        for note in self.notes:
            # 检查笔记作者是否被封禁
            if user_manager.is_user_banned(note['author']):
                continue

            # 过滤评论中被封禁用户的内容
            filtered_comments = []
            for comment in note['comments']:
                if not user_manager.is_user_banned(comment['author']):
                    filtered_comments.append(comment)

            # 创建过滤后的笔记副本
            filtered_note = note.copy()
            filtered_note['comments'] = filtered_comments
            visible_notes.append(filtered_note)

        return visible_notes

功能特点

  • 自动生成唯一ID和时间戳
  • 支持版本历史记录(可用于实现撤销功能)
  • 集成评论系统
FeedbackManager - 反馈管理
class FeedbackManager:
    """意见反馈管理器"""
    def __init__(self):
        self.feedbacks_file = FEEDBACKS_FILE
        self.feedbacks = self.load_feedbacks()

    def load_feedbacks(self):
        """加载反馈数据"""
        if os.path.exists(self.feedbacks_file):
            with open(self.feedbacks_file, 'r', encoding='utf-8') as f:
                try:
                    return json.load(f)
                except json.JSONDecodeError:
                    return []
        return []

    def save_feedbacks(self):
        """保存反馈数据"""
        os.makedirs(os.path.dirname(self.feedbacks_file), exist_ok=True)
        with open(self.feedbacks_file, 'w', encoding='utf-8') as f:
            json.dump(self.feedbacks, f, indent=2, ensure_ascii=False)

    def add_feedback(self, content, author):
        """添加新反馈"""
        max_id = max([feedback['id'] for feedback in self.feedbacks]) if self.feedbacks else 0
        new_feedback = {
            "id": max_id + 1,
            "content": content,
            "author": author,
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        self.feedbacks.append(new_feedback)
        self.save_feedbacks()
        return new_feedback

    def delete_feedback(self, feedback_id):
        """删除反馈"""
        feedback = next((f for f in self.feedbacks if f['id'] == feedback_id), None)
        if feedback:
            self.feedbacks.remove(feedback)
            self.save_feedbacks()
            return True
        return False

    def get_visible_feedbacks(self, user_manager):
        """ 获取可见的反馈(过滤被封禁用户的内容) """
        visible_feedbacks = []
        for feedback in self.feedbacks:
            # 检查反馈作者是否被封禁
            if not user_manager.is_user_banned(feedback['author']):
                visible_feedbacks.append(feedback)

        return visible_feedbacks

3. 核心功能实现

用户认证与授权

密码加密

def hash_password(password):
    return hashlib.sha256(password.encode()).hexdigest()

使用SHA-256哈希算法加密密码,确保用户密码不以明文形式存储。

登录装饰器

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请先登录', 'warning')
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

这个装饰器确保只有登录用户才能访问受保护的页面。

管理员权限装饰器

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请先登录', 'warning')
            return redirect(url_for('login'))
            
        user_manager = UserManager()
        user = user_manager.get_user_by_id(session['user_id'])
        
        if not user or not user.get('is_admin', False):
            flash('需要管理员权限', 'danger')
            return redirect(url_for('index'))
            
        return f(*args, **kwargs)
    return decorated_function
Markdown解析功能
def markdown_to_html(md_text):
    """Markdown转HTML"""
    if not md_text:
        return ""

    # 处理换行
    md_text = md_text.replace("\n", "<br>")

    # 处理Markdown语法
    replacements = [
        (r'^# (.*?)(<br>|$)', r'<h4>\1</h4>'), # 标题
        (r'\*\*(.*?)\*\*', r'<strong>\1</strong>'), # 加粗
        (r'\*(.*?)\*', r'<em>\1</em>'), # 斜体
        (r'`(.*?)`', r'<code>\1</code>'), # 行内代码
        (r'~~(.*?)~~', r'<del>\1</del>'), # 删除线
        (r'^> (.*?)(<br>|$)', r'<blockquote>\1</blockquote>'), # 引用
        (r'\[(.*?)\]\((.*?)\)', r'<a href="\2">\1</a>'), # 链接
    ]

    for pattern, repl in replacements:
        md_text = re.sub(pattern, repl, md_text)
    return md_text

这个函数将Markdown语法转换为HTML,让用户可以轻松格式化他们的笔记内容。

用户等级系统
def get_user_level(note_count):
    """根据笔记数量计算用户等级"""
    if note_count >= 401:
        return 6
    elif note_count >= 351:
        return 5
    elif note_count >= 201:
        return 4
    elif note_count >= 101:
        return 3
    elif note_count >= 51:
        return 2
    else:
        return 1

def get_level_color(level):
    colors = ['#ff4757', '#ffa502', '#ffd32a', '#2ed573', '#1e90ff', '#5352ed']
    return colors[min(level - 1, 5)] if level > 0 else '#a4b0be'

用户等级系统根据用户创建的笔记数量来划分等级,不同等级显示不同颜色的标识,增加了用户的参与感和成就感。

4. 路由设计

用户认证路由

用户注册

@app.route('/register', methods=['GET', 'POST'])
def register():
    """用户注册"""
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        nickname = request.form.get('nickname')

        if not all([username, password, nickname]):
            flash('请填写所有必填字段', 'warning')
            return render_template('register.html')

        user_manager = UserManager()
        success, message = user_manager.register_user(username, password, nickname)

        if success:
            flash(message, 'success')
            return redirect(url_for('login'))
        else:
            flash(message, 'danger')

    return render_template('register.html')

用户登录

@app.route('/login', methods=['GET', 'POST'])
def login():
    """用户登录"""
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')

        if not username or not password:
            flash('请填写用户名和密码', 'warning')
            return render_template('login.html')

        user_manager = UserManager()
        user = user_manager.authenticate_user(username, password)

        if user:
            if user.get('is_banned', False):
                flash('您的账号已被封禁', 'danger')
                return render_template('login.html')

            # 设置会话信息
            session['user_id'] = user['id']
            session['username'] = user['username']
            session['nickname'] = user['nickname']
            session['is_admin'] = user.get('is_admin', False)
            flash('登录成功', 'success')
            return redirect(url_for('index'))
        else:
            flash('用户名或密码错误', 'danger')

    return render_template('login.html')
笔记管理路由

首页 - 显示所有笔记

@app.route('/')
def index():
    """首页"""
    try:
        note_manager = NoteManager()
        user_manager = UserManager()

        # 获取可见的笔记(过滤被封禁用户的内容)
        visible_notes = note_manager.get_visible_notes(user_manager)

        # 获取当前用户的笔记和评论数据(如果已登录)
        my_notes = []
        my_comments = []

        if 'user_id' in session:
            # 获取当前用户的笔记
            my_notes = [note for note in note_manager.notes
                       if note['author'] == session.get('nickname') and
                       not user_manager.is_user_banned(session.get('nickname'))]

            # 获取当前用户的评论
            for note in note_manager.notes:
                if not user_manager.is_user_banned(note['author']):
                    for comment in note.get('comments', []):
                        if comment['author'] == session.get('nickname'):
                            # 添加笔记信息到评论中
                            comment_with_note = comment.copy()
                            comment_with_note['note_id'] = note['id']
                            comment_with_note['note_title'] = note['title']
                            my_comments.append(comment_with_note)

        return render_template('index.html',
                             notes=visible_notes,
                             my_notes=my_notes,
                             my_comments=my_comments,
                             markdown_to_html=markdown_to_html)
    except Exception as e:
        app.logger.error(f"Error loading index: {str(e)}")
        flash("加载笔记时发生错误", "danger")
        return render_template('index.html',
                             notes=[],
                             my_notes=[],
                             my_comments=[],
                             markdown_to_html=markdown_to_html)

添加笔记

@app.route('/add', methods=['GET', 'POST'])
@login_required
def add_note():
    """添加笔记"""
    if request.method == 'POST':
        title = request.form.get('title')
        content = request.form.get('content')

        if title and content:
            try:
                note_manager = NoteManager()
                note_manager.add_note(title, content, session.get('nickname', '匿名用户'))
                flash("笔记添加成功", "success")
                return redirect(url_for('index'))
            except Exception as e:
                app.logger.error(f"Error adding note: {str(e)}")
                flash("添加笔记时发生错误", "danger")
        else:
            flash("标题和内容不能为空", "warning")

    return render_template('edit_note.html', note=None)
管理员功能路由

管理员面板

@app.route('/admin')
@admin_required
def admin_panel():
    """管理员面板"""
    user_manager = UserManager()
    note_manager = NoteManager()

    users = user_manager.users
    notes = note_manager.notes

    # 为每个用户添加笔记数量信息
    for user in users:
        user['note_count'] = note_manager.get_user_note_count(user['nickname'])
        user['level'] = get_user_level(user['note_count'])
        user['level_color'] = get_level_color(user['level'])

    return render_template('admin.html', users=users, notes=notes)

用户管理操作

@app.route('/admin/ban_user/<int:user_id>')
@admin_required
def ban_user(user_id):
    """封禁用户"""
    user_manager = UserManager()
    if user_manager.ban_user(user_id):
        flash('用户封禁成功', 'success')
    else:
        flash('封禁用户失败', 'danger')

    return redirect(url_for('admin_panel'))

完整代码链接
我的笔记网站链接

部署方式

1.使用Gunicorn部署Flask应用

Gunicorn是一个WSGI HTTP服务器,适合部署生产环境的Flask应用。安装Gunicorn后,通过命令行启动应用:

pip install gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 app:app

-w指定worker数量,app:app中第一个app是模块名,第二个是Flask实例名。

2.结合Nginx反向代理

Nginx作为反向代理可处理静态文件并提高安全性。配置Nginx的/etc/nginx/sites-available/your_app

server {
    listen 80;
    server_name your_domain.com;

    location / {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header Host $host;
    }

    location /static {
        alias /path/to/your/static/files;
    }
}

启用配置后重启Nginx:

sudo ln -s /etc/nginx/sites-available/your_app /etc/nginx/sites-enabled
sudo systemctl restart nginx

3.使用Docker容器化部署

创建Dockerfile构建镜像:

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["gunicorn", "-w 4", "-b", "0.0.0.0:8000", "app:app"]

构建并运行容器:

docker build -t flask-app .
docker run -d -p 8000:8000 flask-app

4.云平台部署(Heroku)

安装Heroku CLI后,创建Procfile指定启动命令:

web: gunicorn app:app

通过Git部署到Heroku:

heroku login
heroku create
git push heroku main

5.云平台部署(PythonAnyWhere)

  1. 打开PythonAnyWhere网址
  2. 注册一个初学者账户
  3. 登陆,然后点击Files,上传代码
  4. 点击Web,在第二步选择Flask
  5. 点击下一步,选择Python和Flask的版本
  6. 点击下一步,输入app.py的路径

6.使用Supervisor管理进程

Supervisor可确保应用崩溃后自动重启。配置/etc/supervisor/conf.d/flask_app.conf

[program:flask_app]
command=/path/to/gunicorn -w 4 -b 127.0.0.1:8000 app:app
directory=/path/to/your/app
user=your_user
autostart=true
autorestart=true

启动Supervisor服务:

sudo supervisorctl reread
sudo supervisorctl update

7.使用Apache和mod_wsgi

安装mod_wsgi模块后,配置Apache虚拟主机:

<VirtualHost *:80>
    ServerName your_domain.com
    WSGIDaemonProcess flask_app user=your_user threads=5
    WSGIScriptAlias / /path/to/your/app.wsgi

    <Directory /path/to/your/app>
        WSGIProcessGroup flask_app
        WSGIApplicationGroup %{GLOBAL}
        Require all granted
    </Directory>
</VirtualHost>

app.wsgi文件内容需包含Flask应用实例的引用。

总结

通过本教程,我们完整地实现了一个功能丰富的笔记网站。这个项目涵盖了Web开发的多个重要方面:

  1. Flask框架的核心用法:路由、模板、请求处理等
  2. 用户认证与授权系统:登录、注册、权限控制
  3. 数据持久化:JSON文件操作(可扩展为数据库)
  4. 前端开发:Bootstrap响应式设计、JavaScript交互
  5. 安全考虑:密码加密、XSS防护、CSRF防护
  6. 错误处理与日志记录:完善的异常处理机制

这个项目不仅是一个可用的笔记应用,更是一个良好的学习模板。你可以基于这个项目继续扩展功能,如添加标签系统、笔记分享功能、实时协作编辑等。

最重要的是,通过完成这个项目,你已经掌握了使用Flask开发Web应用的核心技能,能够应对大多数中小型Web项目的开发需求。

下一步学习建议

  1. 学习数据库集成:将JSON存储替换为SQLAlchemy + PostgreSQL
  2. 探索前端框架:集成Vue.js或React提升用户体验
  3. 学习RESTful API设计:为移动应用提供API支持
  4. 掌握部署技能:学习使用Docker、Nginx部署应用
  5. 深入了解安全:学习Web应用安全最佳实践

希望本教程对你的学习之旅有所帮助,祝你编程愉快!