True or False? 基于 BERT 学生数学问题误解检测
代码详见:https://github.com/xiaozhou-alt/Student_Math_Misconception
文章目录
一、项目介绍
学生经常被要求解释他们的数学推理。这些解释为学生的思维提供了丰富的见解,并经常揭示潜在的误解(系统性的错误思维方式)。
例如,学生经常认为 0.355 大于 0.8,因为他们错误地将整数知识应用于小数,推断 355 大于 8。学生在数学中产生了一系列误解,有时是因为他们错误地将先验知识应用于新内容,有时是因为他们试图理解新信息但误解了它。要了解有关这些定义和框架的更多信息,请参阅 此处的链接报告。
将学生的解释标记为包含潜在的误解对于诊断反馈很有价值,但既耗时又难以扩展。误解可能很微妙,具体性各不相同,并随着学生推理新模式的出现而演变。
这个项目使用基于 BERT 的多任务学习框架,同时解决学生数学解题过程中的 三个关键任务:
- 答案正确性分类:判断学生答案 是否正确(二分类)
- 误解类型检测:识别学生在解题中 是否存在误解(二分类)
- 具体误解分类:识别学生解题中具体存在的 误解类型(多分类)
该系统通过联合学习三个相关任务,实现了知识迁移和模型参数共享,显著提升了各个任务的性能表现。项目采用 PyTorch 实现,包含完整的训练流程、早停机制、评估指标计算和结果可视化功能。
项目源于:MAP - Charting Student Math Misunderstandings | Kaggle
数据集下载(包含处理之后的数据集和原始数据集):Student_Math_Misconception (kaggle.com)
训练完成的模型下载:Student_Math_Misconception(kaggle.com)
二、文件夹结构
Student_Math_Misconception/
├── data/
│ ├── README-data.md
│ ├── class.json # 类别定义文件,包含Category和Misconception的类别
│ ├── data.csv # 原始数据集文件
│ ├── split_report.json # 数据集划分报告,记录训练/验证集划分详情
│ ├── train.csv # 划分后的训练集数据
│ └── val.csv # 划分后的验证集数据
├── log/ # 日志文件夹
└── output/ # 输出结果文件夹
├── pic/ # 图片输出
├── sample_predictions.txt # 模型预测结果示例
└── training_history.xlsx # 训练历史记录表格
├── README.md
├── data.ipynb # 数据分析和预处理
├── requirements.txt
├── train.py
三、数据集介绍
在 Eedi 上,学生回答诊断问题(DQ),这是一种多项选择题,具有一个正确答案和三个错误答案,称为干扰。在回答多项选择后,有时会要求学生提供书面解释来证明他们选择的答案是合理的。这些解释是 MAP 数据集的主要焦点,用于识别和解决学生推理中的潜在误解。
数据集包含学生解题记录,每行记录包含以下关键字段:
- [train/val].csv
QuestionId
- 唯一的问题标识符。QuestionText
- 问题的文本。MC_Answer
- 学生选择的多项选择题答案。StudentExplanation
- 学生对选择特定多项选择答案的解释。Category
- 对学生的多项选择题答案与其解释之间关系的分类(例如,True_Misconception,表示正确的多项选择题选择,并附有揭示误解的解释)。Misconception
- 学生对答案的解释中发现的数学误解。仅当类别包含误解时才适用,否则为“NA”。
类别定义存储在class.json
中,包含所有可能的Category
和Misconception
类型标签。
数据集前几行展示如下所示:
row_id | QuestionId | QuestionText | MC_Answer | StudentExplanation | Category | Misconception |
---|---|---|---|---|---|---|
0 0 0 | 31772 31772 31772 | What fraction of the shape is not shaded? Give your answer in its simplest form. [Image: A triangle split into 9 equal smaller triangles. 6 of them are shaded.] |
1 3 \frac{1}{3} 31 | One third is equal to tree nineth | True_Correct | NA |
… | … | … | … | … | … | … |
2763 2763 2763 | 31772 31772 31772 | What fraction of the shape is not shaded? Give your answer in its simplest form. [Image: A triangle split into 9 equal smaller triangles. 6 of them are shaded.] |
3 8 \frac{3}{8} 83 | $\frac{3}{9} because there is 9 9 9 triangles and three are not shaded so the answer = D D D | False_Misconception | Incomplete |
… | … | … | … | … | … | … |
11410 11410 11410 | 31778 31778 31778 | A 10 = 9 15 \frac{A}{10}=\frac{9}{15} 10A=159 What is the value of A A A ? | 4 4 4 | 10 + 5 = 15 10 + 5 = 15 10+5=15 and 9 9 9 is over 15 15 15 so you have to minus 5 5 5 from nine too. | False_Misconception | Additive |
… | … | … | … | … | … | … |
class.json(于 data.ipynb 中处理得到):
{
“Category”: [
“True_Correct”,
…
“False_Correct”
],
“Misconception”: [
“NA”,
“Incomplete”,
…
“Scale”
]
}
四、BERT 模型介绍
BERT(Bidirectional Encoder Representations from Transformers)是由 Google 于2018 年提出的基于 Transformer 架构的预训练语言模型,标志着自然语言处理领域进入预训练大模型时代。其核心创新在于通过双向 Transformer 编码器 捕捉上下文语义,突破了传统单向语言模型的限制。BERT 采用 掩码语言建模(MLM)和 下一句预测(NSP)两大预训练任务,在大规模无标注文本(如 Wikipedia)上学习通用语言表示,通过随机遮蔽 15 % 15\% 15% 的词汇要求模型还原原始文本,同时判断句子间的逻辑关系。这种预训练范式使模型能够捕获词汇、句法和语义的多层次特征,通过微调即可适配文本分类、问答、实体识别等下游任务。
五、项目实现
1. 数据预处理
将原始数据中的问题、答案和解释组合成单一文本输入,从 C a t e g o r y Category Category列提取三个任务的标签:
- i s _ c o r r e c t is\_correct is_correct: 答案是否正确(二分类)
- m i s c o n c e p t i o n _ t y p e misconception\_type misconception_type: 误解类型(多分类)
- m i s c o n c e p t i o n _ l a b e l misconception\_label misconception_label: 具体误解(多分类)
处理缺失值问题,确保所有标签都有有效值
# 数据预处理
def preprocess_data(df):
# 创建输入文本:问题 + 答案 + 解释
...
# 提取任务标签
...
# 修复缺失值问题
...
return df
2. 自定义数据类
- 继承 P y T o r c h PyTorch PyTorch 的 Dataset \textbf{Dataset} Dataset 类,实现自定义数据集
- 使用 BERT tokenizer 对文本进行编码:
- 添加特殊标记 [CLS] \textbf{[CLS]} [CLS]和 [SEP] \textbf{[SEP]} [SEP]
- 将序列填充/截断到固定长度
- 返回注意力掩码以忽略填充部分
- 返回包含输入 ID、注意力掩码、三个任务标签和原始文本的字典
# 自定义数据集
class MathDataset(Dataset):
def __init__(self, texts, task1_labels, task2_labels, task3_labels, tokenizer, max_len):
...
def __len__(self):
return len(self.texts) # 返回数据集大小
def __getitem__(self, idx):
text = str(self.texts[idx]) # 获取文本
# 使用tokenizer编码文本
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True, # 添加[CLS]和[SEP]
max_length=self.max_len, # 截断/填充到最大长度
padding='max_length', # 填充到最大长度
truncation=True, # 启用截断
return_attention_mask=True, # 返回注意力掩码
return_tensors='pt', # 返回PyTorch张量
)
# 返回处理后的样本
return {
'input_ids': encoding['input_ids'].flatten(), # 输入ID
'attention_mask': encoding['attention_mask'].flatten(), # 注意力掩码
'task1_labels': torch.tensor(self.task1_labels[idx], dtype=torch.long), # 任务1标签
'task2_labels': torch.tensor(self.task2_labels[idx], dtype=torch.long), # 任务2标签
'task3_labels': torch.tensor(self.task3_labels[idx], dtype=torch.long), # 任务3标签
'text': text # 原始文本(用于后续分析)
}
3. 多任务 BERT 模型
使用预训练的 B E R T BERT BERT 模型作为共享编码器;在 B E R T BERT BERT 输出上添加 d r o p o u t dropout dropout 层防止过拟合;三个任务共享相同的 B E R T BERT BERT 编码,但使用独立的分类层; p o o l e r _ o u t p u t pooler\_output pooler_output 是 [CLS] \textbf{[CLS]} [CLS]标记的表示,通常用于分类任务;模型输出三个任务的 l o g i t s logits logits(未归一化的预测分数)
# 多任务模型
class MultiTaskBert(torch.nn.Module):
def __init__(self, num_task1_classes, num_task2_classes, num_task3_classes, model_name='bert-base-uncased'):
super().__init__()
# 共享的BERT编码器
...
# 任务特定的分类层
...
def forward(self, input_ids, attention_mask):
# 获取共享表示
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output # 使用[CLS]的表示
pooled_output = self.dropout(pooled_output) # 应用dropout
# 任务特定输出
...
return task1_logits, task2_logits, task3_logits
4. 评估指标和早停机制
因为本次项目开发的模型是针对三个任务而言的,所以针对三个任务拥有 不同 的评估指标:
- 答案正确性分类:判断学生答案 是否正确(二分类)
- 误解类型检测:识别学生在解题中 是否存在误解(二分类)
- 具体误解分类:识别学生解题中具体存在的 误解类型(多分类)
任务1,2:以 准确率 (Accuracy) 作为评估标准:
A c c u r a c y = 判断的正确个数 验证集样本总数 Accuracy = \frac{判断的正确个数}{验证集样本总数} Accuracy=验证集样本总数判断的正确个数
任务3:以 MAP@3(Mean Average Precision at 3)作为评估标准:
M A P @ 3 = 1 U ∑ u = 1 U ∑ k = 1 min ( n , 3 ) P ( k ) × rel ( k ) MAP@3 = \frac{1}{U} \sum_{u=1}^{U} \sum_{k=1}^{\min(n, 3)} P(k) \times \operatorname{rel}(k) MAP@3=U1u=1∑Uk=1∑min(n,3)P(k)×rel(k)
其中 U U U 是观测数, P ( k ) P(k) P(k) 是截止时的精度 k k k, n n n 是每个观察提交的预测数,并且 r e l ( k ) rel(k) rel(k) 是一个指标函数,如果排名中的项目 k k k 是相关(正确)标签,否则为零。
计算每个样本预测的前 3 3 3 个类别中 是否包含真实标签,如果真实标签在前 k k k 个位置,精度为 1 k + 1 \frac{1}{k+1} k+11,最终结果是所有样本精度的 平均值
# MAP@3 计算函数
def map_at_3(y_true, y_pred_probs):
U = len(y_true) # 样本总数
ap_sum = 0.0 # 平均精度和
for i in range(U):
true_label = y_true[i] # 真实标签
top_3 = np.argsort(y_pred_probs[i])[::-1][:3] # 预测概率最高的前3个类别
ap = 0.0 # 单个样本的平均精度
# 检查前3个预测中是否有真实标签
for k in range(min(3, len(top_3))):
if top_3[k] == true_label:
ap = 1 / (k + 1) # 如果在位置k,则精度为1/(k+1)
break
ap_sum += ap # 累加平均精度
return ap_sum / U # 返回平均MAP@3
早停机制防止模型过拟合,当验证集指标在指定轮数内没有显著提升时停止训练,保存最佳模型状态,以便在训练结束后恢复, min_delta \textbf{min\_delta} min_delta确保改进是实质性的,避免因微小波动而停止
# 早停类
class EarlyStopper:
def __init__(self, patience=3, min_delta=0.001):
self.patience = patience # 容忍轮数
self.min_delta = min_delta # 最小改进值
self.counter = 0 # 计数器
self.best_metric = -np.inf # 最佳指标值
self.best_model_state = None # 最佳模型状态
self.epoch = 0 # 最佳模型所在轮数
def check(self, current_metric, model, epoch):
# 检查当前指标是否显著优于最佳指标
if current_metric > self.best_metric + self.min_delta:
self.best_metric = current_metric # 更新最佳指标
self.counter = 0 # 重置计数器
self.best_model_state = model.state_dict().copy() # 保存模型状态
self.epoch = epoch # 记录最佳轮数
return False # 不需要停止
else:
self.counter += 1 # 增加计数器
if self.counter >= self.patience:
return True # 需要停止
return False # 不需要停止
5. 预测和结果处理
从数据集中随机选择样本进行预测,使用 m o d e l . e v a l ( ) model.eval() model.eval()和 t o r c h . n o _ g r a d ( ) torch.no\_grad() torch.no_grad()确保预测阶段高效且节省内存,将数字标签解码回原始类别名称,提高结果可读性,为 任务1(答案正确性)生成中文标签,便于理解,返回包含原始文本、真实标签和预测标签的字典列表
将预测结果格式化为易读的文本输出,清晰展示每个任务的真实值和预测值
# 加载模型并进行预测
def load_and_predict(model, tokenizer, dataset, task2_encoder, task3_encoder, device, num_samples=10):
model.eval() # 设置为评估模式
# 随机选择样本索引
sample_indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
results = [] # 存储预测结果
for idx in sample_indices:
...
# 获取预测结果(取最高概率的类别)
...
# 获取真实标签
...
# 解码标签(将数字标签转回原始类别名称)
...
# 存储结果
results.append({
...
})
return results
# 格式化输出预测结果
def format_prediction_results(results):
formatted = [] # 存储格式化后的文本行
for i, res in enumerate(results):
formatted.append(f"样本 {i+1}:") # 样本编号
formatted.append(f"文本: {res['text']}") # 原始文本
formatted.append(f"任务1 (答案是否正确):") # 任务1标题
formatted.append(f" 真实: {res['task1_true']} | 预测: {res['task1_pred']}") # 任务1结果
formatted.append(f"任务2 (误解类型):") # 任务2标题
formatted.append(f" 真实: {res['task2_true']} | 预测: {res['task2_pred']}") # 任务2结果
formatted.append(f"任务3 (具体误解):") # 任务3标题
formatted.append(f" 真实: {res['task3_true']} | 预测: {res['task3_pred']}") # 任务3结果
formatted.append("-" * 80) # 分隔线
return "\n".join(formatted) # 返回格式化后的字符串
6. 主函数(开始训练! 好像开始的有点晚)
模型训练部分直接包含在主函数中,没有单独封装成函数,所以按照代码的前后顺序,此处没有按照实际的运行顺序,而是按照代码从上到下的顺序进行讲解 ⌓‿⌓
- 加载训练集和验证集 CSV 文件,应用预处理函数处理数据
- 确定所有可能的类别,包括训练集和验证集中出现的类别
- 为 任务3 添加 N A NA NA类别处理缺失值,使用 L a b e l E n c o d e r LabelEncoder LabelEncoder 将文本标签转换为数字,便于模型处理
def main():
# 加载数据
train_df = pd.read_csv('/kaggle/input/student-math-misconception/data/train.csv')
val_df = pd.read_csv('/kaggle/input/student-math-misconception/data/val.csv')
# 预处理
train_df = preprocess_data(train_df)
val_df = preprocess_data(val_df)
# 确保所有类别都已知
...
# 检查并添加可能的缺失值
if 'NA' not in all_task3_classes:
...
# 标签编码
task2_encoder = LabelEncoder()
...
# 转换标签(将类别名称转换为数字)
train_df['task2_encoded'] = task2_encoder.transform(train_df['misconception_type'])
...
- 使用 BERT tokenizer 进行文本编码,创建训练集和验证集的自定义数据集实例
- 使用 D a t a L o a d e r DataLoader DataLoader创建批处理数据加载器:
- 训练集 洗牌 以提高训练效果
- 使用多线程加速数据加载
- 验证集不洗牌以确保结果一致性
ps:shuffle(中洗牌,混乱)。shuffle 在机器学习与深度学习中代表的意思是,将训练模型的数据集进行打乱的操作。原始的数据,在样本均衡的情况下可能是按照某种顺序进行排列,如前半部分为某一类别的数据,后半部分为另一类别的数据。但经过打乱之后数据的排列就会拥有一定的随机性,在顺序读取的时候下一次得到的样本为任何一类型的数据的可能性相同。本项目中问题类型相同的样本条目就被放到一起,直接读入就非常不利于模型训练。
# 初始化tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# 创建数据集
train_dataset = MathDataset(
texts=train_df.text.values,
task1_labels=train_df.is_correct.values,
task2_labels=train_df.task2_encoded.values,
task3_labels=train_df.task3_encoded.values,
tokenizer=tokenizer,
max_len=MAX_LEN
)
val_dataset = MathDataset(
...
)
# 创建数据加载器
train_loader = DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
shuffle=True, # 训练集需要洗牌
num_workers=NUM_WORKERS # 多线程加载数据
)
val_loader = DataLoader(
...
)
- 初始化多任务 BERT 模型,根据类别数量配置输出层
- 使用 A d a m W AdamW AdamW 优化器,这是一种改进的 Adam 优化器
- 设置学习率调度器,使用 线性衰减 策略,为每个任务使用 交叉熵 损失函数
- 初始化早停机制,防止过拟合
- 创建字典记录训练过程中的各项指标
# 初始化模型
model = MultiTaskBert(
num_task1_classes=2, # 任务1:二分类
num_task2_classes=len(task2_encoder.classes_), # 任务2类别数
num_task3_classes=len(task3_encoder.classes_) # 任务3类别数
)
model = model.to(DEVICE) # 将模型移至指定设备
# 优化器和调度器
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE) # AdamW优化器
...
# 损失函数
loss_fn1 = torch.nn.CrossEntropyLoss() # 任务1损失函数
...
# 初始化早停器
early_stopper = EarlyStopper(patience=PATIENCE, min_delta=MIN_DELTA)
# 训练历史记录
history = {
...
}
前向传播计算三个任务的输出,加权组合三个任务的损失(权重可调整),梯度裁剪防止梯度爆炸,记录并显示每个epoch的平均训练损失
# 训练循环(添加进度条)
for epoch in range(EPOCHS):
model.train() # 设置为训练模式
total_loss = 0 # 累计损失
# 使用tqdm包装训练数据加载器(添加进度条)
train_pbar = tqdm(train_loader,
desc=f"Epoch {epoch+1}/{EPOCHS} Training",
bar_format='{l_bar}{bar:20}{r_bar}{bar:-20b}',
dynamic_ncols=True)
for batch in train_pbar:
# 准备批数据
input_ids = batch['input_ids'].to(DEVICE)
attention_mask = batch['attention_mask'].to(DEVICE)
task1_labels = batch['task1_labels'].to(DEVICE)
task2_labels = batch['task2_labels'].to(DEVICE)
task3_labels = batch['task3_labels'].to(DEVICE)
optimizer.zero_grad() # 清零梯度
# 前向传播
task1_logits, task2_logits, task3_logits = model(
input_ids=input_ids,
attention_mask=attention_mask
)
# 计算各任务损失
loss1 = loss_fn1(task1_logits, task1_labels)
loss2 = loss_fn2(task2_logits, task2_labels)
loss3 = loss_fn3(task3_logits, task3_labels)
# 加权损失(权重可调整)
loss = 0.4 * loss1 + 0.3 * loss2 + 0.3 * loss3
total_loss += loss.item() # 累加损失
# 反向传播
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step() # 更新参数
scheduler.step() # 更新学习率
# 更新进度条显示当前batch损失
train_pbar.set_postfix({
'batch_loss': f"{loss.item():.4f}", # 当前批次损失
'avg_loss': f"{total_loss / (train_pbar.n + 1):.4f}" # 平均损失
})
# 计算平均训练损失
avg_train_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_train_loss)
六、结果展示
训练过程输出如下所示:
训练 损失 和三个任务的 评估指标 随训练轮数(Epoch)的变化如下图所示:
随机测试的 10 10 10 个样本预测结果(此处只展示个别样本输出):
样本 1:
文本: 问题: This is part of a regular polygon. How many sides does it have? [Image: A diagram showing an obtuse angle labelled 144 degrees] 答案: Not enough information 解释: because we kneed to know the total number of internal degrees.
任务1 (答案是否正确):
真实: 错误 | 预测: 错误
任务2 (误解类型):
真实: Misconception | 预测: Misconception
任务3 (具体误解):
真实: Unknowable | 预测: Unknowable
样本 2:
文本: 问题: The probability of an event occurring is ( 0.9 ).
Which of the following most accurately describes the likelihood of the event occurring? 答案: Likely 解释: because it is 0.9 which is close to 1 (certain) on the scale of probability.
任务1 (答案是否正确):
真实: 正确 | 预测: 正确
任务2 (误解类型):
真实: Correct | 预测: Correct
任务3 (具体误解):
真实: NA | 预测: NA
如果你喜欢我的文章,不妨给小周一个免费的点赞和关注吧!