Pytorch深度学习框架60天进阶学习计划 - 第39天:联邦学习系统
今天我们将深入学习三个关键部分:差分隐私机制、横向/纵向联邦学习的数据划分策略,以及模型聚合的安全协议。
1. 差分隐私机制详解
1.1 什么是差分隐私?
差分隐私(Differential Privacy,DP)是一种数学框架,它允许我们在保护个体隐私的同时从数据中提取有用信息。简单来说,差分隐私确保无论数据集中是否包含某个个体的数据,查询结果都应该大致相同。
差分隐私的核心思想可以用一个形象的比喻来理解:想象你是一名统计学家,需要了解某个城市居民的平均收入。如果你直接收集并公布数据,可能会泄露个人信息。使用差分隐私,你会在真实统计结果上添加一定的"噪声",这样即使知道结果,也无法确定任何特定个体的信息。
差分隐私的关键特性:
- 具有数学上的严格保证
- 可以量化隐私保护的程度(通过ε参数)
- 具有组合特性,可以跟踪多次查询的隐私损失
1.2 差分隐私的数学定义
在数学上,如果一个随机化算法M满足以下条件,我们就说它提供了ε-差分隐私:
对于任意两个仅相差一个元素的数据集D和D’,以及算法M的所有可能输出S:
P r [ M ( D ) ∈ S ] ≤ e ε × P r [ M ( D ′ ) ∈ S ] Pr[M(D) \in S] \leq e^{\varepsilon} \times Pr[M(D') \in S] Pr[M(D)∈S]≤eε×Pr[M(D′)∈S]
这里,ε是隐私预算,控制着隐私保护的强度。ε越小,隐私保护越强,但准确性可能降低;ε越大,准确性提高,但隐私保护减弱。
1.3 常用的差分隐私机制
下面是几种在联邦学习中常用的差分隐私机制:
机制名称 | 适用场景 | 原理 | 优点 | 缺点 |
---|---|---|---|---|
拉普拉斯机制 | 数值查询 | 添加拉普拉斯分布噪声 | 实现简单,理论保证好 | 对异常值敏感 |
高斯机制 | 数值查询 | 添加高斯分布噪声 | 与许多机器学习算法兼容性好 | 提供的是(ε,δ)-DP,而非纯ε-DP |
指数机制 | 非数值查询 | 基于效用函数概率选择输出 | 适用范围广 | 计算复杂度高 |
随机响应 | 布尔查询 | 以一定概率翻转真实答案 | 直观易懂 | 仅适用于简单查询 |
1.4 PyTorch中实现差分隐私的示例代码
PyTorch提供了opacus
库来简化差分隐私训练的实现。下面是一个使用差分隐私训练简单神经网络的例子:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from opacus import PrivacyEngine
import numpy as np
# 设置随机种子以确保结果可复现
torch.manual_seed(42)
np.random.seed(42)
# 1. 创建一个简单的神经网络
class SimpleNN(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(SimpleNN, self).__init__()
self.layer1 = nn.Linear(input_dim, hidden_dim)
self.activation = nn.ReLU()
self.layer2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = self.layer1(x)
x = self.activation(x)
x = self.layer2(x)
return x
# 2. 生成一些模拟数据
def generate_sample_data(n_samples=1000, input_dim=20):
X = np.random.randn(n_samples, input_dim)
# 创建二分类标签
y = np.random.randint(0, 2, size=n_samples)
return X, y
# 3. 准备数据
X, y = generate_sample_data()
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
# 4. 初始化模型和优化器
input_dim = X.shape[1]
hidden_dim = 50
output_dim = 2 # 二分类
model = SimpleNN(input_dim, hidden_dim, output_dim)
optimizer = optim.SGD(model.parameters(), lr=0.05)
criterion = nn.CrossEntropyLoss()
# 5. 添加差分隐私
privacy_engine = PrivacyEngine()
model, optimizer, dataloader = privacy_engine.make_private(
module=model,
optimizer=optimizer,
data_loader=dataloader,
noise_multiplier=1.0, # 控制添加的噪声大小
max_grad_norm=1.0, # 梯度裁剪范数
)
# 6. 训练模型
def train(epochs=10):
model.train()
for epoch in range(epochs):
total_loss = 0
for data, target in dataloader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
# 每个epoch打印隐私预算
epsilon = privacy_engine.get_epsilon(delta=1e-5)
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}, ε: {epsilon:.2f}")
# 7. 执行训练
print("开始使用差分隐私训练模型...")
train()
# 8. 在没有差分隐私的情况下训练同样的模型进行对比
print("\n开始训练没有差分隐私保护的基准模型...")
model_no_dp = SimpleNN(input_dim, hidden_dim, output_dim)
optimizer_no_dp = optim.SGD(model_no_dp.parameters(), lr=0.05)
dataloader_no_dp = DataLoader(dataset, batch_size=64, shuffle=True)
model_no_dp.train()
for epoch in range(10):
total_loss = 0
for data, target in dataloader_no_dp:
optimizer_no_dp.zero_grad()
output = model_no_dp(data)
loss = criterion(output, target)
loss.backward()
optimizer_no_dp.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/10, Loss: {total_loss/len(dataloader_no_dp):.4f}")
# 9. 比较两个模型的准确率
model.eval()
model_no_dp.eval()
correct_dp = 0
correct_no_dp = 0
with torch.no_grad():
for data, target in dataloader:
output_dp = model(data)
pred_dp = output_dp.argmax(dim=1)
correct_dp += pred_dp.eq(target).sum().item()
output_no_dp = model_no_dp(data)
pred_no_dp = output_no_dp.argmax(dim=1)
correct_no_dp += pred_no_dp.eq(target).sum().item()
print(f"\n差分隐私模型准确率: {100. * correct_dp / len(dataset):.2f}%")
print(f"基准模型准确率: {100. * correct_no_dp / len(dataset):.2f}%")
1.5 差分隐私在联邦学习中的应用流程图
+-------------------+ +----------------------+ +------------------------+
| 本地数据训练阶段 | | 差分隐私保护阶段 | | 模型聚合与更新阶段 |
+-------------------+ +----------------------+ +------------------------+
| 1. 客户端接收模型 | | 1. 计算模型参数梯度 | | 1. 服务器收集受保护的 |
| 2. 使用本地数据 | -> | 2. 应用梯度裁剪 | -> | 梯度或模型参数 |
| 进行前向计算 | | 3. 添加校准噪声 | | 2. 聚合参数 |
| 3. 计算损失 | | 4. 更新本地模型 | | 3. 更新全局模型 |
+-------------------+ +----------------------+ +------------------------+
2. 横向联邦学习与纵向联邦学习
联邦学习按照数据分割方式主要分为横向联邦学习和纵向联邦学习。理解它们的区别对于设计合适的联邦学习系统至关重要。
2.1 横向联邦学习与纵向联邦学习的比较
特性 | 横向联邦学习 (HFL) | 纵向联邦学习 (VFL) |
---|---|---|
数据特点 | 各参与方拥有相同特征空间的不同样本 | 各参与方拥有相同样本的不同特征 |
典型应用场景 | 不同地区医院拥有相同类型的病例数据 | 银行和电商平台拥有同一用户的不同维度信息 |
实现复杂度 | 相对简单 | 更为复杂,需要对齐实体 |
隐私保护重点 | 样本隐私 | 特征隐私 |
模型构建方式 | 同构模型,训练目标相同 | 异构模型,可能需要复杂协议进行协作 |
通信效率 | 每轮需传输完整模型参数 | 只需传输部分参数和中间结果 |
2.2 横向联邦学习示例代码
下面是一个使用PyTorch实现横向联邦学习的简化示例:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
# 1. 准备数据集并模拟分布式环境
digits = load_digits()
X, y = digits.data, digits.target
X = StandardScaler().fit_transform(X) # 标准化数据
# 将数据分成3个客户端
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 模拟3个不同的客户端数据集(水平划分)
client_data = []
num_clients = 3
client_proportions = [0.4, 0.3, 0.3] # 每个客户端拥有的数据比例
start_idx = 0
for i in range(num_clients):
end_idx = start_idx + int(len(X_train) * client_proportions[i])
client_data.append((
torch.FloatTensor(X_train[start_idx:end_idx]),
torch.LongTensor(y_train[start_idx:end_idx])
))
start_idx = end_idx
# 2. 定义模型
class DigitClassifier(nn.Module):
def __init__(self, input_dim=64, hidden_dim=50, output_dim=10):
super(DigitClassifier, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# 3. 定义联邦学习过程
class FederatedLearning:
def __init__(self, model, client_data, test_data, num_rounds=10, local_epochs=5, lr=0.01):
self.model = model
self.client_data = client_data
self.test_data = test_data
self.num_rounds = num_rounds
self.local_epochs = local_epochs
self.lr = lr
self.criterion = nn.CrossEntropyLoss()
def train_client(self, client_id, global_model):
"""在单个客户端上训练模型"""
# 复制全局模型参数
local_model = DigitClassifier()
local_model.load_state_dict(global_model.state_dict())
optimizer = optim.SGD(local_model.parameters(), lr=self.lr)
local_model.train()
X_client, y_client = self.client_data[client_id]
for _ in range(self.local_epochs):
optimizer.zero_grad()
outputs = local_model(X_client)
loss = self.criterion(outputs, y_client)
loss.backward()
optimizer.step()
return local_model.state_dict()
def aggregate_models(self, model_dicts):
"""使用FedAvg算法聚合客户端模型"""
client_sizes = [len(self.client_data[i][0]) for i in range(len(self.client_data))]
total_size = sum(client_sizes)
# 初始化全局模型字典
global_dict = {}
for k in model_dicts[0].keys():
global_dict[k] = torch.zeros_like(model_dicts[0][k])
# 加权平均参数
for i, model_dict in enumerate(model_dicts):
weight = client_sizes[i] / total_size
for k in global_dict.keys():
global_dict[k] += model_dict[k] * weight
return global_dict
def evaluate(self, model):
"""评估模型性能"""
model.eval()
X_test, y_test = self.test_data
with torch.no_grad():
outputs = model(X_test)
_, predicted = torch.max(outputs.data, 1)
accuracy = (predicted == y_test).sum().item() / y_test.size(0)
return accuracy
def train(self):
"""执行联邦学习过程"""
global_model = self.model
for round_num in range(self.num_rounds):
print(f"联邦学习轮次 {round_num+1}/{self.num_rounds}")
# 收集每个客户端的模型
client_models = []
for client_id in range(len(self.client_data)):
print(f" 训练客户端 {client_id+1}")
client_model = self.train_client(client_id, global_model)
client_models.append(client_model)
# 聚合模型
global_dict = self.aggregate_models(client_models)
global_model.load_state_dict(global_dict)
# 评估全局模型
accuracy = self.evaluate(global_model)
print(f" 轮次 {round_num+1} 完成, 测试准确率: {accuracy:.4f}")
return global_model
# 4. 运行联邦学习
model = DigitClassifier()
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)
test_data = (X_test_tensor, y_test_tensor)
federated_learning = FederatedLearning(
model=model,
client_data=client_data,
test_data=test_data,
num_rounds=10,
local_epochs=5,
lr=0.01
)
final_model = federated_learning.train()
# 5. 评估最终模型
final_accuracy = federated_learning.evaluate(final_model)
print(f"\n最终模型准确率: {final_accuracy:.4f}")
2.3 纵向联邦学习示例代码
下面是一个使用PyTorch实现的纵向联邦学习简化示例:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
# 1. 加载数据集
cancer = load_breast_cancer()
X, y = cancer.data, cancer.target
X = StandardScaler().fit_transform(X) # 标准化数据
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 2. 垂直划分特征(假设有两个参与方)
# 参与方A拥有前15个特征
# 参与方B拥有后15个特征
feature_split = 15
X_train_A = X_train[:, :feature_split]
X_train_B = X_train[:, feature_split:]
X_test_A = X_test[:, :feature_split]
X_test_B = X_test[:, feature_split:]
# 转换为PyTorch张量
X_train_A_tensor = torch.FloatTensor(X_train_A)
X_train_B_tensor = torch.FloatTensor(X_train_B)
y_train_tensor = torch.FloatTensor(y_train)
X_test_A_tensor = torch.FloatTensor(X_test_A)
X_test_B_tensor = torch.FloatTensor(X_test_B)
y_test_tensor = torch.FloatTensor(y_test)
# 3. 定义参与方模型
class PartyModel(nn.Module):
def __init__(self, input_dim, embedding_dim):
super(PartyModel, self).__init__()
self.fc = nn.Sequential(
nn.Linear(input_dim, embedding_dim),
nn.ReLU()
)
def forward(self, x):
return self.fc(x)
# 4. 定义顶层模型(由可信的第三方持有)
class TopModel(nn.Module):
def __init__(self, input_dim):
super(TopModel, self).__init__()
self.fc = nn.Sequential(
nn.Linear(input_dim, 1),
nn.Sigmoid()
)
def forward(self, x):
return self.fc(x)
# 5. 纵向联邦学习训练过程
class VerticalFederatedLearning:
def __init__(self, party_A_model, party_B_model, top_model,
party_A_data, party_B_data, labels,
test_A_data, test_B_data, test_labels,
epochs=50, lr=0.01):
self.party_A_model = party_A_model
self.party_B_model = party_B_model
self.top_model = top_model
self.party_A_data = party_A_data
self.party_B_data = party_B_data
self.labels = labels
self.test_A_data = test_A_data
self.test_B_data = test_B_data
self.test_labels = test_labels
self.epochs = epochs
self.lr = lr
# 初始化优化器
self.optimizer_A = optim.Adam(self.party_A_model.parameters(), lr=lr)
self.optimizer_B = optim.Adam(self.party_B_model.parameters(), lr=lr)
self.optimizer_top = optim.Adam(self.top_model.parameters(), lr=lr)
# 损失函数
self.criterion = nn.BCELoss()
def train(self):
for epoch in range(self.epochs):
# 前向传播
# 1. 各参与方计算自己的嵌入向量
embedding_A = self.party_A_model(self.party_A_data)
embedding_B = self.party_B_model(self.party_B_data)
# 2. 将嵌入向量发送给可信的第三方
combined_embedding = torch.cat((embedding_A, embedding_B), dim=1)
# 3. 第三方使用顶层模型进行预测
predictions = self.top_model(combined_embedding).squeeze()
# 4. 计算损失
loss = self.criterion(predictions, self.labels)
# 反向传播和优化
self.optimizer_A.zero_grad()
self.optimizer_B.zero_grad()
self.optimizer_top.zero_grad()
loss.backward()
self.optimizer_A.step()
self.optimizer_B.step()
self.optimizer_top.step()
# 每10个epoch评估一次模型
if (epoch + 1) % 10 == 0:
accuracy = self.evaluate()
print(f"Epoch {epoch+1}/{self.epochs}, Loss: {loss.item():.4f}, Accuracy: {accuracy:.4f}")
def evaluate(self):
self.party_A_model.eval()
self.party_B_model.eval()
self.top_model.eval()
with torch.no_grad():
embedding_A = self.party_A_model(self.test_A_data)
embedding_B = self.party_B_model(self.test_B_data)
combined_embedding = torch.cat((embedding_A, embedding_B), dim=1)
predictions = self.top_model(combined_embedding).squeeze()
predicted_labels = (predictions > 0.5).float()
accuracy = (predicted_labels == self.test_labels).float().mean()
self.party_A_model.train()
self.party_B_model.train()
self.top_model.train()
return accuracy.item()
# 6. 初始化模型
embedding_dim = 8
party_A_model = PartyModel(input_dim=X_train_A.shape[1], embedding_dim=embedding_dim)
party_B_model = PartyModel(input_dim=X_train_B.shape[1], embedding_dim=embedding_dim)
top_model = TopModel(input_dim=embedding_dim*2) # 两个参与方的嵌入向量拼接
# 7. 创建并执行纵向联邦学习
vfl = VerticalFederatedLearning(
party_A_model=party_A_model,
party_B_model=party_B_model,
top_model=top_model,
party_A_data=X_train_A_tensor,
party_B_data=X_train_B_tensor,
labels=y_train_tensor,
test_A_data=X_test_A_tensor,
test_B_data=X_test_B_tensor,
test_labels=y_test_tensor,
epochs=100,
lr=0.01
)
print("开始纵向联邦学习训练...")
vfl.train()
# 8. 评估最终模型
final_accuracy = vfl.evaluate()
print(f"\n最终模型准确率: {final_accuracy:.4f}")
2.4 横向与纵向联邦学习的数据划分策略流程图
横向联邦学习数据划分:
+-------------+ +-------------+ +-------------+
| 客户端 A | | 客户端 B | | 客户端 C |
+-------------+ +-------------+ +-------------+
| 样本 1-1000 | | 样本 1001- | | 样本 2001- |
| 特征 1-100 | | 2000 | | 3000 |
| | | 特征 1-100 | | 特征 1-100 |
+-------------+ +-------------+ +-------------+
| | |
v v v
+-------------------------------------------+
| 中央服务器 |
| (聚合模型参数,但不获取原始数据) |
+-------------------------------------------+
纵向联邦学习数据划分:
+-------------+ +-------------+ +-------------+
| 客户端 A | | 客户端 B | | 客户端 C |
+-------------+ +-------------+ +-------------+
| 样本 1-3000 | | 样本 1-3000 | | 样本 1-3000 |
| 特征 1-30 | | 特征 31-60 | | 特征 61-100 |
+-------------+ +-------------+ +-------------+
| | |
v v v
+-------------------------------------------+
| 第三方协调者 |
| (协调训练过程,但不获取原始特征) |
+-------------------------------------------+
3. 模型聚合的安全协议
联邦学习中的模型聚合是一个关键步骤,需要安全协议来保护参与方的隐私。
3.1 常见的安全聚合协议
下面是几种常见的安全聚合协议比较:
协议名称 | 核心思想 | 安全保证 | 通信开销 | 计算开销 | 适用场景 |
---|---|---|---|---|---|
安全聚合 (SecAgg) | 使用秘密共享和掩码 | 隐藏个体更新 | 中等 | 中等 | 水平联邦 |
同态加密 | 在加密域进行计算 | 高度安全 | 高 | 高 | 水平和垂直联邦 |
多方安全计算 (MPC) | 分布式计算 | 高度安全 | 高 | 高 | 垂直联邦 |
SMPC+DP | 结合安全多方计算和差分隐私 | 非常高 | 高 | 高 | 高敏感场景 |
TEE (可信执行环境) | 硬件安全区 | 高度安全 | 低 | 低 | 需特殊硬件 |
3.2 安全聚合协议示例代码
以下是一个使用PySyft实现基本安全聚合的示例:
import torch
import syft as sy
# 初始化PySyft环境
hook = sy.TorchHook(torch)
# 创建模拟联邦学习的虚拟工作机
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")
charlie = sy.VirtualWorker(hook, id="charlie")
secure_worker = sy.VirtualWorker(hook, id="secure_aggregator")
# 创建一个简单的模型
model = torch.nn.Linear(10, 1)
# 发送模型副本给每个参与方
model_alice = model.copy().send(alice)
model_bob = model.copy().send(bob)
model_charlie = model.copy().send(charlie)
# 模拟本地数据和训练
# 在实际应用中,每个参与方会有自己的数据和训练循环
data_alice = torch.randn(100, 10).send(alice)
labels_alice = torch.randn(100, 1).send(alice)
# 模拟本地数据和训练
data_bob = torch.randn(100, 10).send(bob)
labels_bob = torch.randn(100, 1).send(bob)
data_charlie = torch.randn(100, 10).send(charlie)
labels_charlie = torch.randn(100, 1).send(charlie)
# 简单的本地训练过程
optimizer_alice = torch.optim.SGD(params=model_alice.parameters(), lr=0.1)
optimizer_bob = torch.optim.SGD(params=model_bob.parameters(), lr=0.1)
optimizer_charlie = torch.optim.SGD(params=model_charlie.parameters(), lr=0.1)
# 执行一些训练步骤
for i in range(5): # 模拟几个本地训练步骤
# Alice的训练
pred_alice = model_alice(data_alice)
loss_alice = ((pred_alice - labels_alice) ** 2).sum()
loss_alice.backward()
optimizer_alice.step()
optimizer_alice.zero_grad()
# Bob的训练
pred_bob = model_bob(data_bob)
loss_bob = ((pred_bob - labels_bob) ** 2).sum()
loss_bob.backward()
optimizer_bob.step()
optimizer_bob.zero_grad()
# Charlie的训练
pred_charlie = model_charlie(data_charlie)
loss_charlie = ((pred_charlie - labels_charlie) ** 2).sum()
loss_charlie.backward()
optimizer_charlie.step()
optimizer_charlie.zero_grad()
# 现在实现安全聚合
# 1. 每个参与方加密其模型更新
def get_model_params(model):
"""获取模型参数"""
return [param.data for param in model.parameters()]
# 获取加密后的模型参数
encrypted_params_alice = [param.copy().encrypt(protocol="frac").send(secure_worker)
for param in get_model_params(model_alice)]
encrypted_params_bob = [param.copy().encrypt(protocol="frac").send(secure_worker)
for param in get_model_params(model_bob)]
encrypted_params_charlie = [param.copy().encrypt(protocol="frac").send(secure_worker)
for param in get_model_params(model_charlie)]
# 2. 在安全工作机上进行加密聚合
secure_aggregated_params = []
for p_a, p_b, p_c in zip(encrypted_params_alice, encrypted_params_bob, encrypted_params_charlie):
# 聚合模型参数(这里简单地取平均值)
aggregated = (p_a + p_b + p_c) / 3
secure_aggregated_params.append(aggregated)
# 3. 解密聚合后的参数并更新全局模型
decrypted_params = [param.get().decrypt() for param in secure_aggregated_params]
# 更新全局模型
for param, new_param in zip(model.parameters(), decrypted_params):
param.data.copy_(new_param)
print("已完成安全聚合,更新了全局模型")
# 检查模型是否成功更新
print("全局模型参数示例:")
for name, param in model.named_parameters():
print(f"{name}: {param.data[:2]}")
3.3 使用同态加密的安全聚合示例
同态加密允许在不解密的情况下对加密数据进行计算,是保护联邦学习隐私的强大工具。下面是一个使用TenSEAL库(基于CKKS同态加密方案)实现的安全聚合示例:
import torch
import tenseal as ts
import numpy as np
# 创建模拟数据
def create_client_data(num_clients, input_dim, num_samples):
client_data = []
for _ in range(num_clients):
X = np.random.randn(num_samples, input_dim).astype(np.float32)
y = (np.random.rand(num_samples) > 0.5).astype(np.float32)
client_data.append((X, y))
return client_data
# 模型定义
class LinearModel:
def __init__(self, input_dim):
self.weights = np.random.randn(input_dim).astype(np.float32)
self.bias = np.random.randn(1).astype(np.float32)
def predict(self, X):
return np.dot(X, self.weights) + self.bias
def train(self, X, y, learning_rate=0.01, epochs=5):
for _ in range(epochs):
predictions = self.predict(X)
errors = predictions - y
# 更新权重和偏置
gradient_weights = np.dot(X.T, errors) / len(y)
gradient_bias = np.mean(errors)
self.weights -= learning_rate * gradient_weights
self.bias -= learning_rate * gradient_bias
def get_params(self):
return self.weights, self.bias
def set_params(self, weights, bias):
self.weights = weights
self.bias = bias
# 初始化TenSEAL上下文 (同态加密)
def create_tenseal_context():
# 创建加密上下文
context = ts.context(
ts.SCHEME_TYPE.CKKS,
poly_modulus_degree=8192,
coeff_mod_bit_sizes=[60, 40, 40, 60]
)
context.global_scale = 2**40
context.generate_galois_keys()
return context
# 联邦学习与同态加密实现
def federated_learning_with_he(client_data, input_dim, num_rounds=3, local_epochs=2):
# 创建全局模型
global_model = LinearModel(input_dim)
# 创建加密上下文
context = create_tenseal_context()
print("开始联邦学习训练(使用同态加密)...")
for round_num in range(num_rounds):
print(f"轮次 {round_num+1}/{num_rounds}")
# 本地训练
local_models = []
for client_id, (X, y) in enumerate(client_data):
# 复制全局模型
local_model = LinearModel(input_dim)
global_weights, global_bias = global_model.get_params()
local_model.set_params(global_weights.copy(), global_bias.copy())
# 进行本地训练
local_model.train(X, y, epochs=local_epochs)
local_models.append(local_model)
# 加密并聚合模型更新
encrypted_weight_updates = []
encrypted_bias_updates = []
for local_model in local_models:
# 计算模型更新(与全局模型相比)
local_weights, local_bias = local_model.get_params()
global_weights, global_bias = global_model.get_params()
weight_update = local_weights - global_weights
bias_update = local_bias - global_bias
# 加密模型更新
encrypted_weights = ts.ckks_vector(context, weight_update)
encrypted_bias = ts.ckks_vector(context, bias_update)
encrypted_weight_updates.append(encrypted_weights)
encrypted_bias_updates.append(encrypted_bias)
# 同态加密下的聚合
aggregated_weights = encrypted_weight_updates[0]
for enc_weights in encrypted_weight_updates[1:]:
aggregated_weights += enc_weights
aggregated_bias = encrypted_bias_updates[0]
for enc_bias in encrypted_bias_updates[1:]:
aggregated_bias += enc_bias
# 计算平均值
num_clients = len(client_data)
aggregated_weights *= (1.0 / num_clients)
aggregated_bias *= (1.0 / num_clients)
# 解密聚合结果
decrypted_weight_update = np.array(aggregated_weights.decrypt())
decrypted_bias_update = np.array(aggregated_bias.decrypt())
# 更新全局模型
global_weights, global_bias = global_model.get_params()
global_model.set_params(
global_weights + decrypted_weight_update,
global_bias + decrypted_bias_update
)
return global_model
# 运行联邦学习
num_clients = 3
input_dim = 10
num_samples = 100
client_data = create_client_data(num_clients, input_dim, num_samples)
final_model = federated_learning_with_he(
client_data,
input_dim,
num_rounds=3,
local_epochs=2
)
# 评估模型
def evaluate_model(model, test_data):
X_test, y_test = test_data
predictions = model.predict(X_test)
binary_predictions = (predictions > 0.5).astype(np.float32)
accuracy = np.mean(binary_predictions == y_test)
return accuracy
# 创建测试数据
X_test = np.random.randn(50, input_dim).astype(np.float32)
y_test = (np.random.rand(50) > 0.5).astype(np.float32)
test_data = (X_test, y_test)
accuracy = evaluate_model(final_model, test_data)
print(f"\n最终模型准确率: {accuracy:.4f}")
3.4 多方安全计算示例
多方安全计算(MPC)是一种密码学技术,允许多个参与方共同计算一个函数,而不暴露各自的输入。下面是一个使用PySyft的MPC实现示例:
import torch
import syft as sy
# 初始化PySyft
hook = sy.TorchHook(torch)
# 创建两个虚拟工作机
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")
# 创建安全工作机进行多方计算
secure_worker = sy.VirtualWorker(hook, id="secure_worker")
# 1. 准备数据 (每个参与方都有自己的私有数据)
# Alice的数据
x_alice = torch.tensor([0.1, 0.2, 0.3]).send(alice)
# Bob的数据
x_bob = torch.tensor([0.7, 0.8, 0.9]).send(bob)
# 2. 使用加法共享加密方案,将数据秘密共享给两个参与方
# 将Alice的数据加密并共享
encrypted_x_alice = x_alice.share(alice, bob, crypto_provider=secure_worker)
# 将Bob的数据加密并共享
encrypted_x_bob = x_bob.share(alice, bob, crypto_provider=secure_worker)
# 3. 在加密状态下执行计算
# 计算两个向量的点积(安全地)
encrypted_result = (encrypted_x_alice * encrypted_x_bob).sum()
# 4. 解密结果
decrypted_result = encrypted_result.get().float_precision()
print(f"安全计算的结果: {decrypted_result}")
# 5. 验证结果(与明文计算结果对比)
plaintext_result = (x_alice.get() * x_bob.get()).sum()
print(f"明文计算的结果: {plaintext_result}")
print(f"两种计算方法的误差: {abs(decrypted_result - plaintext_result)}")
3.5 模型聚合安全协议的流程图
+---------------------+ +---------------------+ +---------------------+
| 客户端 A | | 客户端 B | | 客户端 C |
+---------------------+ +---------------------+ +---------------------+
| 1. 接收全局模型 | | 1. 接收全局模型 | | 1. 接收全局模型 |
| 2. 使用本地数据训练 | | 2. 使用本地数据训练 | | 2. 使用本地数据训练 |
| 3. 计算模型更新 | | 3. 计算模型更新 | | 3. 计算模型更新 |
+---------------------+ +---------------------+ +---------------------+
| | |
| | |
v v v
+-----------------------------------------------------------------------+
| 安全聚合协议 |
+-----------------------------------------------------------------------+
| 1. 加密阶段: 各客户端加密其模型更新 |
| 2. 掩码阶段: 各客户端添加随机掩码,相互抵消的方式确保掩码和为零 |
| 3. 聚合阶段: 服务器聚合加密的更新,掩码相互抵消 |
| 4. 解密阶段: 协议确保只有最终聚合结果被解密,各客户端的输入保持隐私 |
+-----------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------+
| 服务器 |
+-----------------------------------------------------------------------+
| 1. 接收聚合后的模型更新 |
| 2. 应用到全局模型 |
| 3. 分发新的全局模型 |
+-----------------------------------------------------------------------+
4. 联邦学习系统的综合实例:联邦学习图像分类器
为了将今天所学的内容综合起来,让我们实现一个具有差分隐私保护的横向联邦学习图像分类系统:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
from opacus import PrivacyEngine
import copy
import matplotlib.pyplot as plt
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
# 1. 数据准备
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
# 加载MNIST数据集
mnist_train = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
mnist_test = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
# 模拟3个不同的客户端,将数据集水平划分
def split_data(dataset, num_clients):
"""水平划分数据集给不同客户端"""
num_items_per_client = len(dataset) // num_clients
client_datasets = []
for i in range(num_clients):
start_idx = i * num_items_per_client
end_idx = (i + 1) * num_items_per_client if i < num_clients - 1 else len(dataset)
indices = range(start_idx, end_idx)
client_datasets.append(torch.utils.data.Subset(dataset, indices))
return client_datasets
num_clients = 3
client_datasets = split_data(mnist_train, num_clients)
test_loader = torch.utils.data.DataLoader(mnist_test, batch_size=64, shuffle=False)
# 2. 模型定义
class MnistCNN(nn.Module):
def __init__(self):
super(MnistCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
self.dropout = nn.Dropout2d()
self.pool = nn.MaxPool2d(2)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.pool(self.conv1(x)))
x = self.relu(self.pool(self.dropout(self.conv2(x))))
x = x.view(-1, 320)
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# 3. 联邦学习实现
class FederatedLearningWithDP:
def __init__(self, global_model, client_datasets, test_loader,
num_rounds=5, local_epochs=1, learning_rate=0.01,
noise_multiplier=1.0, max_grad_norm=1.0, delta=1e-5):
self.global_model = global_model
self.client_datasets = client_datasets
self.test_loader = test_loader
self.num_rounds = num_rounds
self.local_epochs = local_epochs
self.learning_rate = learning_rate
self.noise_multiplier = noise_multiplier
self.max_grad_norm = max_grad_norm
self.delta = delta
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.global_model.to(self.device)
print(f"使用设备: {self.device}")
def train_client(self, client_id, global_model_state):
"""在单个客户端上训练模型(带差分隐私)"""
# 复制全局模型
local_model = MnistCNN().to(self.device)
local_model.load_state_dict(global_model_state)
local_model.train()
# 准备数据
train_loader = torch.utils.data.DataLoader(
self.client_datasets[client_id], batch_size=32, shuffle=True
)
# 设置优化器
optimizer = optim.SGD(local_model.parameters(), lr=self.learning_rate)
criterion = nn.CrossEntropyLoss()
# 添加差分隐私
privacy_engine = PrivacyEngine()
model, optimizer, train_loader = privacy_engine.make_private(
module=local_model,
optimizer=optimizer,
data_loader=train_loader,
noise_multiplier=self.noise_multiplier,
max_grad_norm=self.max_grad_norm,
)
# 训练模型
for epoch in range(self.local_epochs):
for data, target in train_loader:
data, target = data.to(self.device), target.to(self.device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
# 计算隐私预算
epsilon = privacy_engine.get_epsilon(delta=self.delta)
print(f" 客户端 {client_id+1} 的隐私预算: ε = {epsilon:.2f}")
return local_model.state_dict()
def aggregate_models(self, model_states):
"""使用FedAvg聚合客户端模型"""
# 初始化一个空字典来存储聚合后的参数
global_dict = copy.deepcopy(model_states[0])
# 对所有模型参数求平均
for key in global_dict.keys():
global_dict[key] = torch.stack([states[key] for states in model_states]).mean(0)
return global_dict
def evaluate(self, model):
"""评估模型性能"""
model.eval()
test_loss = 0
correct = 0
criterion = nn.CrossEntropyLoss()
with torch.no_grad():
for data, target in self.test_loader:
data, target = data.to(self.device), target.to(self.device)
output = model(data)
test_loss += criterion(output, target).item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(self.test_loader.dataset)
accuracy = 100. * correct / len(self.test_loader.dataset)
return test_loss, accuracy
def train(self):
"""执行联邦学习过程"""
accuracies = []
for round_num in range(self.num_rounds):
print(f"联邦学习轮次 {round_num+1}/{self.num_rounds}")
# 收集每个客户端的模型
client_models = []
for client_id in range(len(self.client_datasets)):
print(f" 训练客户端 {client_id+1}")
client_model = self.train_client(client_id, copy.deepcopy(self.global_model.state_dict()))
client_models.append(client_model)
# 聚合模型
global_dict = self.aggregate_models(client_models)
self.global_model.load_state_dict(global_dict)
# 评估全局模型
test_loss, accuracy = self.evaluate(self.global_model)
accuracies.append(accuracy)
print(f" 轮次 {round_num+1} 完成, 测试损失: {test_loss:.4f}, 准确率: {accuracy:.2f}%")
return self.global_model, accuracies
# 4. 执行联邦学习
global_model = MnistCNN()
fl_with_dp = FederatedLearningWithDP(
global_model=global_model,
client_datasets=client_datasets,
test_loader=test_loader,
num_rounds=10,
local_epochs=1,
learning_rate=0.01,
noise_multiplier=1.0, # 控制添加噪声的程度
max_grad_norm=1.0
)
print("开始联邦学习训练(带差分隐私保护)...")
final_model, accuracies = fl_with_dp.train()
# 5. 可视化结果
plt.figure(figsize=(10, 5))
plt.plot(range(1, len(accuracies) + 1), accuracies, marker='o')
plt.xlabel('联邦学习轮次')
plt.ylabel('测试准确率 (%)')
plt.title('联邦学习过程中的模型准确率')
plt.grid(True)
plt.savefig('federated_learning_accuracy.png')
plt.close()
print(f"\n联邦学习完成! 最终模型准确率: {accuracies[-1]:.2f}%")
5. 总结与实践应用
今天我们深入学习了联邦学习系统的三个核心内容:差分隐私机制、横向/纵向联邦学习的数据划分策略,以及模型聚合的安全协议。
5.1 关键要点总结
差分隐私是一种数学框架,通过添加经过校准的噪声来保护个体隐私,同时允许对数据集进行有用的分析。
- 常用机制包括拉普拉斯机制、高斯机制和指数机制
- 隐私预算(ε)控制着隐私保护强度与实用性之间的权衡
横向联邦学习适用于参与方拥有相同特征空间但不同样本的情况。
- 实现相对简单,通常使用FedAvg等聚合算法
- 主要保护样本隐私
纵向联邦学习适用于参与方拥有相同样本但不同特征的情况。
- 实现较为复杂,需要安全的实体对齐和协作训练
- 主要保护特征隐私
安全聚合协议确保在聚合模型参数时不会泄露个体参与方的信息。
- 常见方法包括安全聚合(SecAgg)、同态加密和多方安全计算(MPC)
- 不同方法在安全性、通信开销和计算开销上各有权衡
5.2 实际应用场景
联邦学习系统的典型应用场景包括:
- 医疗健康: 不同医院可以在不共享病人敏感数据的情况下,协作训练疾病诊断模型
- 金融科技: 银行、保险和信贷机构可以共同构建风险评估模型,同时保护客户隐私
- 智能手机: Google已经在Android设备上应用联邦学习来训练键盘预测和语音识别模型
- 智能交通: 不同车企和交通部门可以共享数据来训练交通预测模型
- 跨企业协作: 不同企业可以在不泄露商业机密的情况下共同训练模型
5.3 学习资源推荐
要深入学习联邦学习系统,以下资源非常有用:
理论学习:
- 《联邦学习》(杨强等著)
- “Communication-Efficient Learning of Deep Networks from Decentralized Data” (联邦学习原始论文)
- “Deep Learning with Differential Privacy” (差分隐私在深度学习中的应用)
编程资源:
- PySyft: 用于隐私保护机器学习的Python库
- Flower: 专注于联邦学习的开源框架
- TensorFlow Federated: Google的联邦学习框架
- Opacus: Facebook的PyTorch差分隐私库
实践项目:
- 构建一个具有差分隐私保护的联邦学习系统
- 尝试实现横向和纵向联邦学习的对比实验
- 探索不同安全聚合协议的性能比较
清华大学全五版的《DeepSeek教程》完整的文档需要的朋友,关注我私信:deepseek 即可获得。
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!