在当今信息爆炸的时代,一个方便快捷的笔记管理系统对于学习和工作都至关重要。今天,小编将详细介绍如何使用Python的Flask框架构建一个功能完整的笔记网站——“我的笔记”。这个项目不仅包含了基本的CRUD(增删改查)功能,还实现了用户权限管理、Markdown支持、响应式设计等高级特性。
本教程适合有一定Python基础并希望学习Web开发的开发者。通过完成这个项目,你将掌握Flask开发的核心概念、前端与后端的交互方式以及Web应用部署的相关知识。
项目概述
功能特性
我们的"我的笔记"网站具备以下核心功能:
- 用户管理系统:注册、登录、权限控制和用户等级系统
- 笔记管理:创建、编辑、删除和查看笔记
- 评论系统:用户可以对笔记发表评论
- 管理员功能:用户管理、内容审核和反馈处理
- Markdown支持:使用Markdown语法编写富文本内容
- 响应式设计:适配桌面和移动设备
- 用户等级系统:根据用户活跃度显示不同等级标识
技术栈
- 后端框架:Flask (Python)
- 前端技术:HTML5, CSS3, JavaScript, Bootstrap 5
- 数据存储:JSON文件(也可轻松迁移到数据库)
- 模板引擎:Jinja2
- 加密技术:SHA-256密码加密
环境搭建与项目结构
安装必要的库
首先,确保你已安装Python 3.7+,然后安装所需的依赖库:
pip3 install flask
项目结构
创建以下项目结构:
my_notes/
├── app.py # 主应用文件
├── notes.json # 笔记数据文件(自动生成)
├── users.json # 用户数据文件(自动生成)
├── feedbacks.json # 反馈数据文件(自动生成)
└── templates/ # 模板目录
├── base.html # 基础模板
├── index.html # 首页
├── login.html # 登录页
├── register.html # 注册页
├── view_note.html # 笔记详情页
├── edit_note.html # 编辑笔记页
├── admin.html # 管理员面板
├── admin_feedbacks.html # 反馈管理页
├── feedback.html # 意见反馈页
├── delete_account.html # 账户注销页
├── 404.html # 404错误页
└── 500.html # 500错误页
核心代码解析
1. Flask应用初始化
让我们从Flask应用的初始化开始,这是整个项目的基础:
from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session
import json
import os
import re
import hashlib
from datetime import datetime
from functools import wraps
# 初始化Flask应用
app = Flask(__name__, template_folder=TEMPLATE_DIR)
app.secret_key = 'my_secret_key' # 用于会话加密
代码解析:
- 导入必要的Flask模块和其他辅助库
secret_key
用于加密会话数据,在生产环境中应使用更复杂的密钥- 设置模板文件夹路径,确保模板文件能被正确加载
2. 数据管理类设计
我们设计了三个核心管理类来处理不同的数据实体:
UserManager - 用户管理
class UserManager:
class UserManager:
""" 用户管理器 """
def __init__(self):
self.users_file = USERS_FILE
self.users = self.load_users()
def load_users(self):
""" 加载用户数据 """
if os.path.exists(self.users_file):
with open(self.users_file, 'r', encoding='utf-8') as f:
try:
users = json.load(f)
# 确保每个用户都有is_admin字段
for user in users:
if 'is_admin' not in user:
user['is_admin'] = False
if 'is_banned' not in user:
user['is_banned'] = False
return users
except json.JSONDecodeError:
return []
return []
def save_users(self):
""" 保存用户数据 """
os.makedirs(os.path.dirname(self.users_file), exist_ok=True)
with open(self.users_file, 'w', encoding='utf-8') as f:
json.dump(self.users, f, indent=2, ensure_ascii=False)
def register_user(self, username, password, nickname):
""" 注册新用户 """
# 检查用户名是否已存在
if any(user['username'] == username for user in self.users):
return False, "用户名已存在"
# 检查昵称是否已存在
if any(user['nickname'] == nickname for user in self.users):
return False, "昵称已存在"
# 创建新用户
new_user = {
"id": len(self.users) + 1,
"username": username,
"password": hash_password(password),
"nickname": nickname,
"is_admin": False, # 默认不是管理员
"is_banned": False, # 默认未被封禁
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.users.append(new_user)
self.save_users()
return True, "注册成功"
def make_admin(self, username):
""" 将用户设为管理员 """
user = next((u for u in self.users if u['username'] == username), None)
if user:
user['is_admin'] = True
self.save_users()
return True
return False
def ban_user(self, user_id):
""" 封禁用户 """
user = self.get_user_by_id(user_id)
if user and not user['is_admin']: # 不能封禁管理员
user['is_banned'] = True
self.save_users()
return True
return False
def unban_user(self, user_id):
""" 解封用户 """
user = self.get_user_by_id(user_id)
if user:
user['is_banned'] = False
self.save_users()
return True
return False
def authenticate_user(self, username, password):
""" 验证用户 """
user = next((u for u in self.users if u['username'] == username), None)
if user and user['password'] == hash_password(password):
return user
return None
def get_user_by_id(self, user_id):
""" 通过ID获取用户 """
return next((u for u in self.users if u['id'] == user_id), None)
def get_user_by_nickname(self, nickname):
""" 通过昵称获取用户 """
return next((u for u in self.users if u['nickname'] == nickname), None)
def is_user_banned(self, nickname):
""" 检查用户是否被封禁 """
user = self.get_user_by_nickname(nickname)
return user and user.get('is_banned', False)
def delete_user(self, user_id):
"""删除用户账号"""
user = self.get_user_by_id(user_id)
if user and not user.get('is_admin', False): # 不能删除管理员
# 从用户列表中移除
self.users = [u for u in self.users if u['id'] != user_id]
self.save_users()
return True
return False
设计思路:
- 使用JSON文件作为数据存储,简化开发流程
- 通过类封装用户相关的所有操作,提高代码可维护性
- 包含用户验证、权限管理和状态管理功能
NoteManager - 笔记管理
class NoteManager:
""" 笔记管理器 """
def __init__(self):
self.notes_file = NOTES_FILE
self.notes = self.load_notes()
def load_notes(self):
""" 加载笔记数据 """
if os.path.exists(self.notes_file):
with open(self.notes_file, 'r', encoding='utf-8') as f:
try:
notes = json.load(f)
# 确保每个笔记都有comments字段和author字段
for note in notes:
if 'comments' not in note:
note['comments'] = []
if 'author' not in note:
note['author'] = '未知用户'
return notes
except json.JSONDecodeError:
return []
return []
def save_notes(self):
""" 保存笔记数据 """
os.makedirs(os.path.dirname(self.notes_file), exist_ok=True)
with open(self.notes_file, 'w', encoding='utf-8') as f:
json.dump(self.notes, f, indent=2, ensure_ascii=False)
def add_note(self, title, content, author):
""" 添加新笔记 """
max_id = max([note['id'] for note in self.notes]) if self.notes else 0
new_note = {
"id": max_id + 1,
"title": title,
"content": content,
"author": author,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"history": [],
"comments": []
}
self.notes.append(new_note)
self.save_notes()
return new_note
def edit_note(self, note_id, new_title, new_content):
""" 修改笔记 """
note = self.find_note(note_id)
if not note:
return False
note['history'].append({
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": note['title'],
"content": note['content']
})
note['title'] = new_title
note['content'] = new_content
note['updated_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.save_notes()
return True
def find_note(self, note_id):
""" 查找笔记 """
try:
note_id = int(note_id)
return next((n for n in self.notes if n['id'] == note_id), None)
except (ValueError, TypeError):
return None
def delete_note(self, note_id):
""" 删除笔记 """
note = self.find_note(note_id)
if not note:
return False
self.notes.remove(note)
self.save_notes()
return True
def add_comment(self, note_id, author, content):
""" 添加评论 """
note = self.find_note(note_id)
if not note:
return False
new_comment = {
"id": len(note['comments']) + 1,
"author": author,
"content": content,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
note['comments'].append(new_comment)
self.save_notes()
return True
def delete_comment(self, note_id, comment_id):
""" 删除评论 """
note = self.find_note(note_id)
if not note:
return False
comment = next((c for c in note['comments'] if c['id'] == comment_id), None)
if comment:
note['comments'].remove(comment)
self.save_notes()
return True
return False
def get_user_note_count(self, nickname):
""" 获取用户的笔记数量 """
return len([note for note in self.notes if note['author'] == nickname])
def get_visible_notes(self, user_manager):
""" 获取可见的笔记(过滤被封禁用户的内容) """
visible_notes = []
for note in self.notes:
# 检查笔记作者是否被封禁
if user_manager.is_user_banned(note['author']):
continue
# 过滤评论中被封禁用户的内容
filtered_comments = []
for comment in note['comments']:
if not user_manager.is_user_banned(comment['author']):
filtered_comments.append(comment)
# 创建过滤后的笔记副本
filtered_note = note.copy()
filtered_note['comments'] = filtered_comments
visible_notes.append(filtered_note)
return visible_notes
功能特点:
- 自动生成唯一ID和时间戳
- 支持版本历史记录(可用于实现撤销功能)
- 集成评论系统
FeedbackManager - 反馈管理
class FeedbackManager:
"""意见反馈管理器"""
def __init__(self):
self.feedbacks_file = FEEDBACKS_FILE
self.feedbacks = self.load_feedbacks()
def load_feedbacks(self):
"""加载反馈数据"""
if os.path.exists(self.feedbacks_file):
with open(self.feedbacks_file, 'r', encoding='utf-8') as f:
try:
return json.load(f)
except json.JSONDecodeError:
return []
return []
def save_feedbacks(self):
"""保存反馈数据"""
os.makedirs(os.path.dirname(self.feedbacks_file), exist_ok=True)
with open(self.feedbacks_file, 'w', encoding='utf-8') as f:
json.dump(self.feedbacks, f, indent=2, ensure_ascii=False)
def add_feedback(self, content, author):
"""添加新反馈"""
max_id = max([feedback['id'] for feedback in self.feedbacks]) if self.feedbacks else 0
new_feedback = {
"id": max_id + 1,
"content": content,
"author": author,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.feedbacks.append(new_feedback)
self.save_feedbacks()
return new_feedback
def delete_feedback(self, feedback_id):
"""删除反馈"""
feedback = next((f for f in self.feedbacks if f['id'] == feedback_id), None)
if feedback:
self.feedbacks.remove(feedback)
self.save_feedbacks()
return True
return False
def get_visible_feedbacks(self, user_manager):
""" 获取可见的反馈(过滤被封禁用户的内容) """
visible_feedbacks = []
for feedback in self.feedbacks:
# 检查反馈作者是否被封禁
if not user_manager.is_user_banned(feedback['author']):
visible_feedbacks.append(feedback)
return visible_feedbacks
3. 核心功能实现
用户认证与授权
密码加密:
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
使用SHA-256哈希算法加密密码,确保用户密码不以明文形式存储。
登录装饰器:
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
这个装饰器确保只有登录用户才能访问受保护的页面。
管理员权限装饰器:
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录', 'warning')
return redirect(url_for('login'))
user_manager = UserManager()
user = user_manager.get_user_by_id(session['user_id'])
if not user or not user.get('is_admin', False):
flash('需要管理员权限', 'danger')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
Markdown解析功能
def markdown_to_html(md_text):
"""Markdown转HTML"""
if not md_text:
return ""
# 处理换行
md_text = md_text.replace("\n", "<br>")
# 处理Markdown语法
replacements = [
(r'^# (.*?)(<br>|$)', r'<h4>\1</h4>'), # 标题
(r'\*\*(.*?)\*\*', r'<strong>\1</strong>'), # 加粗
(r'\*(.*?)\*', r'<em>\1</em>'), # 斜体
(r'`(.*?)`', r'<code>\1</code>'), # 行内代码
(r'~~(.*?)~~', r'<del>\1</del>'), # 删除线
(r'^> (.*?)(<br>|$)', r'<blockquote>\1</blockquote>'), # 引用
(r'\[(.*?)\]\((.*?)\)', r'<a href="\2">\1</a>'), # 链接
]
for pattern, repl in replacements:
md_text = re.sub(pattern, repl, md_text)
return md_text
这个函数将Markdown语法转换为HTML,让用户可以轻松格式化他们的笔记内容。
用户等级系统
def get_user_level(note_count):
"""根据笔记数量计算用户等级"""
if note_count >= 401:
return 6
elif note_count >= 351:
return 5
elif note_count >= 201:
return 4
elif note_count >= 101:
return 3
elif note_count >= 51:
return 2
else:
return 1
def get_level_color(level):
colors = ['#ff4757', '#ffa502', '#ffd32a', '#2ed573', '#1e90ff', '#5352ed']
return colors[min(level - 1, 5)] if level > 0 else '#a4b0be'
用户等级系统根据用户创建的笔记数量来划分等级,不同等级显示不同颜色的标识,增加了用户的参与感和成就感。
4. 路由设计
用户认证路由
用户注册:
@app.route('/register', methods=['GET', 'POST'])
def register():
"""用户注册"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
nickname = request.form.get('nickname')
if not all([username, password, nickname]):
flash('请填写所有必填字段', 'warning')
return render_template('register.html')
user_manager = UserManager()
success, message = user_manager.register_user(username, password, nickname)
if success:
flash(message, 'success')
return redirect(url_for('login'))
else:
flash(message, 'danger')
return render_template('register.html')
用户登录:
@app.route('/login', methods=['GET', 'POST'])
def login():
"""用户登录"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
flash('请填写用户名和密码', 'warning')
return render_template('login.html')
user_manager = UserManager()
user = user_manager.authenticate_user(username, password)
if user:
if user.get('is_banned', False):
flash('您的账号已被封禁', 'danger')
return render_template('login.html')
# 设置会话信息
session['user_id'] = user['id']
session['username'] = user['username']
session['nickname'] = user['nickname']
session['is_admin'] = user.get('is_admin', False)
flash('登录成功', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'danger')
return render_template('login.html')
笔记管理路由
首页 - 显示所有笔记:
@app.route('/')
def index():
"""首页"""
try:
note_manager = NoteManager()
user_manager = UserManager()
# 获取可见的笔记(过滤被封禁用户的内容)
visible_notes = note_manager.get_visible_notes(user_manager)
# 获取当前用户的笔记和评论数据(如果已登录)
my_notes = []
my_comments = []
if 'user_id' in session:
# 获取当前用户的笔记
my_notes = [note for note in note_manager.notes
if note['author'] == session.get('nickname') and
not user_manager.is_user_banned(session.get('nickname'))]
# 获取当前用户的评论
for note in note_manager.notes:
if not user_manager.is_user_banned(note['author']):
for comment in note.get('comments', []):
if comment['author'] == session.get('nickname'):
# 添加笔记信息到评论中
comment_with_note = comment.copy()
comment_with_note['note_id'] = note['id']
comment_with_note['note_title'] = note['title']
my_comments.append(comment_with_note)
return render_template('index.html',
notes=visible_notes,
my_notes=my_notes,
my_comments=my_comments,
markdown_to_html=markdown_to_html)
except Exception as e:
app.logger.error(f"Error loading index: {str(e)}")
flash("加载笔记时发生错误", "danger")
return render_template('index.html',
notes=[],
my_notes=[],
my_comments=[],
markdown_to_html=markdown_to_html)
添加笔记:
@app.route('/add', methods=['GET', 'POST'])
@login_required
def add_note():
"""添加笔记"""
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
if title and content:
try:
note_manager = NoteManager()
note_manager.add_note(title, content, session.get('nickname', '匿名用户'))
flash("笔记添加成功", "success")
return redirect(url_for('index'))
except Exception as e:
app.logger.error(f"Error adding note: {str(e)}")
flash("添加笔记时发生错误", "danger")
else:
flash("标题和内容不能为空", "warning")
return render_template('edit_note.html', note=None)
管理员功能路由
管理员面板:
@app.route('/admin')
@admin_required
def admin_panel():
"""管理员面板"""
user_manager = UserManager()
note_manager = NoteManager()
users = user_manager.users
notes = note_manager.notes
# 为每个用户添加笔记数量信息
for user in users:
user['note_count'] = note_manager.get_user_note_count(user['nickname'])
user['level'] = get_user_level(user['note_count'])
user['level_color'] = get_level_color(user['level'])
return render_template('admin.html', users=users, notes=notes)
用户管理操作:
@app.route('/admin/ban_user/<int:user_id>')
@admin_required
def ban_user(user_id):
"""封禁用户"""
user_manager = UserManager()
if user_manager.ban_user(user_id):
flash('用户封禁成功', 'success')
else:
flash('封禁用户失败', 'danger')
return redirect(url_for('admin_panel'))
部署方式
1.使用Gunicorn部署Flask应用
Gunicorn是一个WSGI HTTP服务器,适合部署生产环境的Flask应用。安装Gunicorn后,通过命令行启动应用:
pip install gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 app:app
-w
指定worker数量,app:app
中第一个app
是模块名,第二个是Flask实例名。
2.结合Nginx反向代理
Nginx作为反向代理可处理静态文件并提高安全性。配置Nginx的/etc/nginx/sites-available/your_app
:
server {
listen 80;
server_name your_domain.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
}
location /static {
alias /path/to/your/static/files;
}
}
启用配置后重启Nginx:
sudo ln -s /etc/nginx/sites-available/your_app /etc/nginx/sites-enabled
sudo systemctl restart nginx
3.使用Docker容器化部署
创建Dockerfile
构建镜像:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["gunicorn", "-w 4", "-b", "0.0.0.0:8000", "app:app"]
构建并运行容器:
docker build -t flask-app .
docker run -d -p 8000:8000 flask-app
4.云平台部署(Heroku)
安装Heroku CLI后,创建Procfile
指定启动命令:
web: gunicorn app:app
通过Git部署到Heroku:
heroku login
heroku create
git push heroku main
5.云平台部署(PythonAnyWhere)
- 打开PythonAnyWhere网址
- 注册一个初学者账户
- 登陆,然后点击
Files
,上传代码 - 点击
Web
,在第二步选择Flask
- 点击下一步,选择Python和Flask的版本
- 点击下一步,输入
app.py
的路径
6.使用Supervisor管理进程
Supervisor可确保应用崩溃后自动重启。配置/etc/supervisor/conf.d/flask_app.conf
:
[program:flask_app]
command=/path/to/gunicorn -w 4 -b 127.0.0.1:8000 app:app
directory=/path/to/your/app
user=your_user
autostart=true
autorestart=true
启动Supervisor服务:
sudo supervisorctl reread
sudo supervisorctl update
7.使用Apache和mod_wsgi
安装mod_wsgi模块后,配置Apache虚拟主机:
<VirtualHost *:80>
ServerName your_domain.com
WSGIDaemonProcess flask_app user=your_user threads=5
WSGIScriptAlias / /path/to/your/app.wsgi
<Directory /path/to/your/app>
WSGIProcessGroup flask_app
WSGIApplicationGroup %{GLOBAL}
Require all granted
</Directory>
</VirtualHost>
app.wsgi
文件内容需包含Flask应用实例的引用。
总结
通过本教程,我们完整地实现了一个功能丰富的笔记网站。这个项目涵盖了Web开发的多个重要方面:
- Flask框架的核心用法:路由、模板、请求处理等
- 用户认证与授权系统:登录、注册、权限控制
- 数据持久化:JSON文件操作(可扩展为数据库)
- 前端开发:Bootstrap响应式设计、JavaScript交互
- 安全考虑:密码加密、XSS防护、CSRF防护
- 错误处理与日志记录:完善的异常处理机制
这个项目不仅是一个可用的笔记应用,更是一个良好的学习模板。你可以基于这个项目继续扩展功能,如添加标签系统、笔记分享功能、实时协作编辑等。
最重要的是,通过完成这个项目,你已经掌握了使用Flask开发Web应用的核心技能,能够应对大多数中小型Web项目的开发需求。
下一步学习建议
- 学习数据库集成:将JSON存储替换为SQLAlchemy + PostgreSQL
- 探索前端框架:集成Vue.js或React提升用户体验
- 学习RESTful API设计:为移动应用提供API支持
- 掌握部署技能:学习使用Docker、Nginx部署应用
- 深入了解安全:学习Web应用安全最佳实践
希望本教程对你的学习之旅有所帮助,祝你编程愉快!