import argparse
import math
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
from omegaconf import OmegaConf
from sklearn.metrics import f1_score
from torch.utils.data import Dataset, DataLoader
from torch.nn import TransformerEncoderLayer, TransformerEncoder
import matplotlib.pyplot as plt
restypes = [
'A', 'R', 'N', 'D', 'C',
'Q', 'E', 'G', 'H', 'I',
'L', 'K', 'M', 'F', 'P',
'S', 'T', 'W', 'Y', 'V'
]
unsure_restype = 'X'
unknown_restype = 'U'
def make_dataset(data_config, train_rate=0.7, valid_rate=0.2):
data_path = data_config.data_path
with open(data_path, 'rb') as f:
data = pickle.load(f)
total_number = len(data)
train_sep = int(total_number * train_rate)
valid_sep = int(total_number * (train_rate + valid_rate))
train_data_dicts = data[:train_sep]
valid_data_dicts = data[train_sep:valid_sep]
test_data_dicts = data[valid_sep:]
train_dataset = DisProtDataset(train_data_dicts)
valid_dataset = DisProtDataset(valid_data_dicts)
test_dataset = DisProtDataset(test_data_dicts)
return train_dataset, valid_dataset, test_dataset
class DisProtDataset(Dataset):
def __init__(self, dict_data):
sequences = [d['sequence'] for d in dict_data]
labels = [d['label'] for d in dict_data]
assert len(sequences) == len(labels)
self.sequences = sequences
self.labels = labels
self.residue_mapping = {'X':20}
self.residue_mapping.update(dict(zip(restypes, range(len(restypes)))))
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
sequence = torch.zeros(len(self.sequences[idx]), len(self.residue_mapping))
for i, c in enumerate(self.sequences[idx]):
if c not in restypes:
c = 'X'
sequence[i][self.residue_mapping[c]] = 1
label = torch.tensor([int(c) for c in self.labels[idx]], dtype=torch.long)
return sequence, label
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.0, max_len=40):
super().__init__()
position = torch.arange(max_len).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
)
pe = torch.zeros(1, max_len, d_model)
pe[0, :, 0::2] = torch.sin(position * div_term)
pe[0, :, 1::2] = torch.cos(position * div_term)
self.register_buffer("pe", pe)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x):
if len(x.shape) == 3:
x = x + self.pe[:, : x.size(1)]
elif len(x.shape) == 4:
x = x + self.pe[:, :x.size(1), None, :]
return self.dropout(x)
class DisProtModel(nn.Module):
def __init__(self, model_config):
super().__init__()
self.d_model = model_config.d_model
self.n_head = model_config.n_head
self.n_layer = model_config.n_layer
self.input_layer = nn.Linear(model_config.i_dim, self.d_model)
self.position_embed = PositionalEncoding(self.d_model, max_len=20000)
self.input_norm = nn.LayerNorm(self.d_model)
self.dropout_in = nn.Dropout(p=0.1)
encoder_layer = TransformerEncoderLayer(
d_model=self.d_model,
nhead=self.n_head,
activation='gelu',
batch_first=True)
self.transformer = TransformerEncoder(encoder_layer, num_layers=self.n_layer)
self.output_layer = nn.Sequential(
nn.Linear(self.d_model, self.d_model),
nn.GELU(),
nn.Dropout(p=0.1),
nn.Linear(self.d_model, model_config.o_dim)
)
def forward(self, x):
x = self.input_layer(x)
x = self.position_embed(x)
x = self.input_norm(x)
x = self.dropout_in(x)
x = self.transformer(x)
x = self.output_layer(x)
return x
def metric_fn(pred, gt):
pred = pred.detach().cpu()
gt = gt.detach().cpu()
pred_labels = torch.argmax(pred, dim=-1).view(-1)
gt_labels = gt.view(-1)
score = f1_score(y_true=gt_labels, y_pred=pred_labels, average='micro')
return score
if __name__ == '__main__':
device = 'cuda' if torch.cuda.is_available() else 'cpu'
parser = argparse.ArgumentParser('IDRs prediction')
parser.add_argument('--config_path', default='./config.yaml')
args = parser.parse_args()
config = OmegaConf.load(args.config_path)
train_dataset, valid_dataset, test_dataset = make_dataset(config.data)
train_dataloader = DataLoader(dataset=train_dataset, batch_size=config.train.dataloader.batch_size, shuffle=config.train.dataloader.shuffle, num_workers=config.train.dataloader.num_workers, drop_last=config.train.dataloader.drop_last)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=1, shuffle=False)
lrs = [2e-4, 1e-5, 1e-4, 1e-3] # 学习率
scores = []
best_model = None
best_score = -1
for lr in lrs:
model = DisProtModel(config.model)
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(),
lr=lr,
weight_decay=config.train.optimizer.weight_decay)
loss_fn = nn.CrossEntropyLoss()
model.eval()
metric = 0.
with torch.no_grad():
for sequence, label in valid_dataloader:
sequence = sequence.to(device)
label = label.to(device)
pred = model(sequence)
metric += metric_fn(pred, label)
init_score = metric / len(valid_dataloader)
all_scores = [init_score]
for epoch in range(config.train.epochs):
# train loop
progress_bar = tqdm(
train_dataloader,
initial=0,
desc=f"epoch:{epoch:03d}, lr: {lr}",
)
model.train()
total_loss = 0.
for sequence, label in progress_bar:
sequence = sequence.to(device)
label = label.to(device)
pred = model(sequence)
loss = loss_fn(pred.permute(0, 2, 1), label)
progress_bar.set_postfix(loss=loss.item())
total_loss += loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
avg_loss = total_loss / len(train_dataloader)
# valid loop
model.eval()
metric = 0.
with torch.no_grad():
for sequence, label in valid_dataloader:
sequence = sequence.to(device)
label = label.to(device)
pred = model(sequence)
metric += metric_fn(pred, label)
score = metric / len(valid_dataloader)
all_scores.append(score)
print(f"avg_training_loss: {avg_loss}, f1_score: {score}")
scores.append(all_scores[-1])
if all_scores[-1] > best_score:
best_score = all_scores[-1]
best_model = model
# 绘制不同学习率下的最终 F1 分数
plt.figure(figsize=(10, 6))
plt.plot([str(lr) for lr in lrs], scores, marker='o')
plt.xlabel('Learning Rate')
plt.ylabel('F1 Score')
plt.title('F1 Score vs Learning Rate')
plt.grid(True)
plt.show()
# 保存得分最高的模型
if best_model is not None:
torch.save(best_model.state_dict(), 'model.pth')
print("Best model saved as model.pth")
pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
gensim==4.3.3
joblib
numpy
pandas
scikit_learn
omegaconf
torch
tqdm==4.67.1
matplotlib==3.10.1
import argparse
import pickle
import torch
from tqdm import tqdm
from omegaconf import OmegaConf
import pandas as pd
from nn_model_baseline import DisProtModel, metric_fn, restypes
def predict_test(path, model_config_path='./config.yaml'):
device = 'cuda' if torch.cuda.is_available() else 'cpu'
config = OmegaConf.load(model_config_path)
# 加载模型
model = DisProtModel(config.model)
model = model.to(device)
model.eval()
model.load_state_dict(torch.load('model.pth', map_location=device))
# 加载测试数据
with open(path, 'rb') as f:
test_data = pickle.load(f)
submit_data = []
with torch.no_grad():
for data in tqdm(test_data, desc="Predicting"):
sequence_str = data['sequence']
sequence = torch.zeros(len(sequence_str), len(restypes) + 1)
residue_mapping = {'X':20}
residue_mapping.update(dict(zip(restypes, range(len(restypes)))))
for i, c in enumerate(sequence_str):
if c not in restypes:
c = 'X'
sequence[i][residue_mapping[c]] = 1
sequence = sequence.unsqueeze(0).to(device) # 添加批次维度
# 模型预测
pred = model(sequence)
pred_labels = torch.argmax(pred, dim=-1).squeeze().cpu().numpy()
submit_data.append([
data['id'], data['sequence'], "".join([str(c) for c in pred_labels])
])
submit_df = pd.DataFrame(submit_data)
submit_df.columns = ["proteinID", "sequence", "IDRs"]
submit_df.to_csv("/saisresult/submitsdata/WSAA_data_test.csv", index=None)
if __name__ == '__main__':
predict_test('WSAA_data_test.pkl')
BERT实体识别 实践
虽然BERT最初是为NLP自然语言处理设计的,但其强大的语言表示能力也可以应用于生物信息学中的蛋白质序列分析,例如本次蛋白质固有无序区域(IDRs)的预测。
步骤1:数据预处理
序列编码:将蛋白质的氨基酸序列转换为BERT可以处理的格式。每个氨基酸可以用单字母表示(如A、C、D等)。由于BERT是基于字符的模型,可以直接将氨基酸序列作为输入。
标签处理:将每个氨基酸位置的无序区域标签(0或1)作为目标标签。例如,
1
表示该位置属于无序区域,0
表示不属于无序区域。分词:虽然蛋白质序列是连续的氨基酸序列,但可以将其视为一个“句子”,每个氨基酸视为一个“词”。BERT的输入格式通常是一个序列,因此可以直接将整个氨基酸序列作为输入。
# 导入必要的库
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
import torch.nn as nn
import torch.optim as optim
# 数据集
data_path = 'WSAA_data_public.pkl'
# 步骤1:数据预处理
class DisProtDataset(Dataset):
def __init__(self, data_dicts):
self.data_dicts = data_dicts
self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
def __len__(self):
return len(self.data_dicts)
def __getitem__(self, idx):
# 获取氨基酸序列和对应的标签
sequence = self.data_dicts[idx]['sequence']
labels = self.data_dicts[idx]['labels']
# 序列编码
inputs = self.tokenizer(sequence, return_tensors='pt', add_special_tokens=False)
input_ids = inputs['input_ids'].squeeze(0)
attention_mask = inputs['attention_mask'].squeeze(0)
# 标签处理
labels = torch.tensor(labels, dtype=torch.float32)
return input_ids, attention_mask, labels
# 数据划分
def make_dataset(data_path, train_rate=0.7, valid_rate=0.2):
with open(data_path, 'rb') as f:
data = pickle.load(f)
total_number = len(data)
train_sep = int(total_number * train_rate)
valid_sep = int(total_number * (train_rate + valid_rate))
train_data_dicts = data[:train_sep]
valid_data_dicts = data[train_sep:valid_sep]
test_data_dicts = data[valid_sep:]
train_dataset = DisProtDataset(train_data_dicts)
valid_dataset = DisProtDataset(valid_data_dicts)
test_dataset = DisProtDataset(test_data_dicts)
return train_dataset, valid_dataset, test_dataset
# 步骤2:加载BERT模型
class BERTForIDRPrediction(nn.Module):
def __init__(self):
super(BERTForIDRPrediction, self).__init__()
self.bert = BertModel.from_pretrained('bert-base-uncased')
self.classifier = nn.Sequential(
nn.Linear(self.bert.config.hidden_size, 1),
nn.Sigmoid()
)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
sequence_output = outputs.last_hidden_state
logits = self.classifier(sequence_output).squeeze(-1)
return logits
# 步骤3:模型微调
def train_model(model, train_loader, valid_loader, epochs=3, lr=2e-5):
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(epochs):
model.train()
total_train_loss = 0
for input_ids, attention_mask, labels in train_loader:
optimizer.zero_grad()
outputs = model(input_ids, attention_mask)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_train_loss += loss.item()
model.eval()
total_valid_loss = 0
with torch.no_grad():
for input_ids, attention_mask, labels in valid_loader:
outputs = model(input_ids, attention_mask)
loss = criterion(outputs, labels)
total_valid_loss += loss.item()
print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {total_train_loss / len(train_loader)}, Valid Loss: {total_valid_loss / len(valid_loader)}')
# 主程序
if __name__ == '__main__':
train_dataset, valid_dataset, test_dataset = make_dataset(data_path)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=4, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)
model = BERTForIDRPrediction()
train_model(model, train_loader, valid_loader)
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/