Pytorch深度学习框架60天进阶学习计划 - 第39天:联邦学习系统

发布于:2025-04-12 ⋅ 阅读:(44) ⋅ 点赞:(0)

Pytorch深度学习框架60天进阶学习计划 - 第39天:联邦学习系统

今天我们将深入学习三个关键部分:差分隐私机制横向/纵向联邦学习的数据划分策略,以及模型聚合的安全协议

1. 差分隐私机制详解

1.1 什么是差分隐私?

差分隐私(Differential Privacy,DP)是一种数学框架,它允许我们在保护个体隐私的同时从数据中提取有用信息。简单来说,差分隐私确保无论数据集中是否包含某个个体的数据,查询结果都应该大致相同。

差分隐私的核心思想可以用一个形象的比喻来理解:想象你是一名统计学家,需要了解某个城市居民的平均收入。如果你直接收集并公布数据,可能会泄露个人信息。使用差分隐私,你会在真实统计结果上添加一定的"噪声",这样即使知道结果,也无法确定任何特定个体的信息。

差分隐私的关键特性:

  • 具有数学上的严格保证
  • 可以量化隐私保护的程度(通过ε参数)
  • 具有组合特性,可以跟踪多次查询的隐私损失

1.2 差分隐私的数学定义

在数学上,如果一个随机化算法M满足以下条件,我们就说它提供了ε-差分隐私:

对于任意两个仅相差一个元素的数据集D和D’,以及算法M的所有可能输出S:

P r [ M ( D ) ∈ S ] ≤ e ε × P r [ M ( D ′ ) ∈ S ] Pr[M(D) \in S] \leq e^{\varepsilon} \times Pr[M(D') \in S] Pr[M(D)S]eε×Pr[M(D)S]

这里,ε是隐私预算,控制着隐私保护的强度。ε越小,隐私保护越强,但准确性可能降低;ε越大,准确性提高,但隐私保护减弱。

1.3 常用的差分隐私机制

下面是几种在联邦学习中常用的差分隐私机制:

机制名称 适用场景 原理 优点 缺点
拉普拉斯机制 数值查询 添加拉普拉斯分布噪声 实现简单,理论保证好 对异常值敏感
高斯机制 数值查询 添加高斯分布噪声 与许多机器学习算法兼容性好 提供的是(ε,δ)-DP,而非纯ε-DP
指数机制 非数值查询 基于效用函数概率选择输出 适用范围广 计算复杂度高
随机响应 布尔查询 以一定概率翻转真实答案 直观易懂 仅适用于简单查询

1.4 PyTorch中实现差分隐私的示例代码

PyTorch提供了opacus库来简化差分隐私训练的实现。下面是一个使用差分隐私训练简单神经网络的例子:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from opacus import PrivacyEngine
import numpy as np

# 设置随机种子以确保结果可复现
torch.manual_seed(42)
np.random.seed(42)

# 1. 创建一个简单的神经网络
class SimpleNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SimpleNN, self).__init__()
        self.layer1 = nn.Linear(input_dim, hidden_dim)
        self.activation = nn.ReLU()
        self.layer2 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.activation(x)
        x = self.layer2(x)
        return x

# 2. 生成一些模拟数据
def generate_sample_data(n_samples=1000, input_dim=20):
    X = np.random.randn(n_samples, input_dim)
    # 创建二分类标签
    y = np.random.randint(0, 2, size=n_samples)
    return X, y

# 3. 准备数据
X, y = generate_sample_data()
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# 4. 初始化模型和优化器
input_dim = X.shape[1]
hidden_dim = 50
output_dim = 2  # 二分类

model = SimpleNN(input_dim, hidden_dim, output_dim)
optimizer = optim.SGD(model.parameters(), lr=0.05)
criterion = nn.CrossEntropyLoss()

# 5. 添加差分隐私
privacy_engine = PrivacyEngine()
model, optimizer, dataloader = privacy_engine.make_private(
    module=model,
    optimizer=optimizer,
    data_loader=dataloader,
    noise_multiplier=1.0,  # 控制添加的噪声大小
    max_grad_norm=1.0,     # 梯度裁剪范数
)

# 6. 训练模型
def train(epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for data, target in dataloader:
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        # 每个epoch打印隐私预算
        epsilon = privacy_engine.get_epsilon(delta=1e-5)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}, ε: {epsilon:.2f}")

# 7. 执行训练
print("开始使用差分隐私训练模型...")
train()

# 8. 在没有差分隐私的情况下训练同样的模型进行对比
print("\n开始训练没有差分隐私保护的基准模型...")
model_no_dp = SimpleNN(input_dim, hidden_dim, output_dim)
optimizer_no_dp = optim.SGD(model_no_dp.parameters(), lr=0.05)
dataloader_no_dp = DataLoader(dataset, batch_size=64, shuffle=True)

model_no_dp.train()
for epoch in range(10):
    total_loss = 0
    for data, target in dataloader_no_dp:
        optimizer_no_dp.zero_grad()
        output = model_no_dp(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer_no_dp.step()
        total_loss += loss.item()
    
    print(f"Epoch {epoch+1}/10, Loss: {total_loss/len(dataloader_no_dp):.4f}")

# 9. 比较两个模型的准确率
model.eval()
model_no_dp.eval()

correct_dp = 0
correct_no_dp = 0
with torch.no_grad():
    for data, target in dataloader:
        output_dp = model(data)
        pred_dp = output_dp.argmax(dim=1)
        correct_dp += pred_dp.eq(target).sum().item()
        
        output_no_dp = model_no_dp(data)
        pred_no_dp = output_no_dp.argmax(dim=1)
        correct_no_dp += pred_no_dp.eq(target).sum().item()

print(f"\n差分隐私模型准确率: {100. * correct_dp / len(dataset):.2f}%")
print(f"基准模型准确率: {100. * correct_no_dp / len(dataset):.2f}%")

1.5 差分隐私在联邦学习中的应用流程图

+-------------------+    +----------------------+    +------------------------+
| 本地数据训练阶段     |    | 差分隐私保护阶段        |    | 模型聚合与更新阶段        |
+-------------------+    +----------------------+    +------------------------+
| 1. 客户端接收模型   |    | 1. 计算模型参数梯度     |    | 1. 服务器收集受保护的      |
| 2. 使用本地数据     | -> | 2. 应用梯度裁剪        | -> |    梯度或模型参数         |
|    进行前向计算     |    | 3. 添加校准噪声        |    | 2. 聚合参数              |
| 3. 计算损失        |    | 4. 更新本地模型        |    | 3. 更新全局模型           |
+-------------------+    +----------------------+    +------------------------+

2. 横向联邦学习与纵向联邦学习

联邦学习按照数据分割方式主要分为横向联邦学习和纵向联邦学习。理解它们的区别对于设计合适的联邦学习系统至关重要。

2.1 横向联邦学习与纵向联邦学习的比较

特性 横向联邦学习 (HFL) 纵向联邦学习 (VFL)
数据特点 各参与方拥有相同特征空间的不同样本 各参与方拥有相同样本的不同特征
典型应用场景 不同地区医院拥有相同类型的病例数据 银行和电商平台拥有同一用户的不同维度信息
实现复杂度 相对简单 更为复杂,需要对齐实体
隐私保护重点 样本隐私 特征隐私
模型构建方式 同构模型,训练目标相同 异构模型,可能需要复杂协议进行协作
通信效率 每轮需传输完整模型参数 只需传输部分参数和中间结果

2.2 横向联邦学习示例代码

下面是一个使用PyTorch实现横向联邦学习的简化示例:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 1. 准备数据集并模拟分布式环境
digits = load_digits()
X, y = digits.data, digits.target
X = StandardScaler().fit_transform(X)  # 标准化数据

# 将数据分成3个客户端
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 模拟3个不同的客户端数据集(水平划分)
client_data = []
num_clients = 3
client_proportions = [0.4, 0.3, 0.3]  # 每个客户端拥有的数据比例

start_idx = 0
for i in range(num_clients):
    end_idx = start_idx + int(len(X_train) * client_proportions[i])
    client_data.append((
        torch.FloatTensor(X_train[start_idx:end_idx]),
        torch.LongTensor(y_train[start_idx:end_idx])
    ))
    start_idx = end_idx

# 2. 定义模型
class DigitClassifier(nn.Module):
    def __init__(self, input_dim=64, hidden_dim=50, output_dim=10):
        super(DigitClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# 3. 定义联邦学习过程
class FederatedLearning:
    def __init__(self, model, client_data, test_data, num_rounds=10, local_epochs=5, lr=0.01):
        self.model = model
        self.client_data = client_data
        self.test_data = test_data
        self.num_rounds = num_rounds
        self.local_epochs = local_epochs
        self.lr = lr
        self.criterion = nn.CrossEntropyLoss()
        
    def train_client(self, client_id, global_model):
        """在单个客户端上训练模型"""
        # 复制全局模型参数
        local_model = DigitClassifier()
        local_model.load_state_dict(global_model.state_dict())
        
        optimizer = optim.SGD(local_model.parameters(), lr=self.lr)
        local_model.train()
        
        X_client, y_client = self.client_data[client_id]
        
        for _ in range(self.local_epochs):
            optimizer.zero_grad()
            outputs = local_model(X_client)
            loss = self.criterion(outputs, y_client)
            loss.backward()
            optimizer.step()
            
        return local_model.state_dict()
    
    def aggregate_models(self, model_dicts):
        """使用FedAvg算法聚合客户端模型"""
        client_sizes = [len(self.client_data[i][0]) for i in range(len(self.client_data))]
        total_size = sum(client_sizes)
        
        # 初始化全局模型字典
        global_dict = {}
        for k in model_dicts[0].keys():
            global_dict[k] = torch.zeros_like(model_dicts[0][k])
            
        # 加权平均参数
        for i, model_dict in enumerate(model_dicts):
            weight = client_sizes[i] / total_size
            for k in global_dict.keys():
                global_dict[k] += model_dict[k] * weight
                
        return global_dict
    
    def evaluate(self, model):
        """评估模型性能"""
        model.eval()
        X_test, y_test = self.test_data
        with torch.no_grad():
            outputs = model(X_test)
            _, predicted = torch.max(outputs.data, 1)
            accuracy = (predicted == y_test).sum().item() / y_test.size(0)
        return accuracy
    
    def train(self):
        """执行联邦学习过程"""
        global_model = self.model
        
        for round_num in range(self.num_rounds):
            print(f"联邦学习轮次 {round_num+1}/{self.num_rounds}")
            
            # 收集每个客户端的模型
            client_models = []
            for client_id in range(len(self.client_data)):
                print(f"  训练客户端 {client_id+1}")
                client_model = self.train_client(client_id, global_model)
                client_models.append(client_model)
            
            # 聚合模型
            global_dict = self.aggregate_models(client_models)
            global_model.load_state_dict(global_dict)
            
            # 评估全局模型
            accuracy = self.evaluate(global_model)
            print(f"  轮次 {round_num+1} 完成, 测试准确率: {accuracy:.4f}")
            
        return global_model

# 4. 运行联邦学习
model = DigitClassifier()
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)
test_data = (X_test_tensor, y_test_tensor)

federated_learning = FederatedLearning(
    model=model, 
    client_data=client_data, 
    test_data=test_data, 
    num_rounds=10, 
    local_epochs=5, 
    lr=0.01
)

final_model = federated_learning.train()

# 5. 评估最终模型
final_accuracy = federated_learning.evaluate(final_model)
print(f"\n最终模型准确率: {final_accuracy:.4f}")

2.3 纵向联邦学习示例代码

下面是一个使用PyTorch实现的纵向联邦学习简化示例:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 1. 加载数据集
cancer = load_breast_cancer()
X, y = cancer.data, cancer.target
X = StandardScaler().fit_transform(X)  # 标准化数据

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 2. 垂直划分特征(假设有两个参与方)
# 参与方A拥有前15个特征
# 参与方B拥有后15个特征
feature_split = 15
X_train_A = X_train[:, :feature_split]
X_train_B = X_train[:, feature_split:]

X_test_A = X_test[:, :feature_split]
X_test_B = X_test[:, feature_split:]

# 转换为PyTorch张量
X_train_A_tensor = torch.FloatTensor(X_train_A)
X_train_B_tensor = torch.FloatTensor(X_train_B)
y_train_tensor = torch.FloatTensor(y_train)

X_test_A_tensor = torch.FloatTensor(X_test_A)
X_test_B_tensor = torch.FloatTensor(X_test_B)
y_test_tensor = torch.FloatTensor(y_test)

# 3. 定义参与方模型
class PartyModel(nn.Module):
    def __init__(self, input_dim, embedding_dim):
        super(PartyModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, embedding_dim),
            nn.ReLU()
        )
        
    def forward(self, x):
        return self.fc(x)

# 4. 定义顶层模型(由可信的第三方持有)
class TopModel(nn.Module):
    def __init__(self, input_dim):
        super(TopModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 1),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        return self.fc(x)

# 5. 纵向联邦学习训练过程
class VerticalFederatedLearning:
    def __init__(self, party_A_model, party_B_model, top_model, 
                 party_A_data, party_B_data, labels, 
                 test_A_data, test_B_data, test_labels,
                 epochs=50, lr=0.01):
        self.party_A_model = party_A_model
        self.party_B_model = party_B_model
        self.top_model = top_model
        self.party_A_data = party_A_data
        self.party_B_data = party_B_data
        self.labels = labels
        self.test_A_data = test_A_data
        self.test_B_data = test_B_data
        self.test_labels = test_labels
        self.epochs = epochs
        self.lr = lr
        
        # 初始化优化器
        self.optimizer_A = optim.Adam(self.party_A_model.parameters(), lr=lr)
        self.optimizer_B = optim.Adam(self.party_B_model.parameters(), lr=lr)
        self.optimizer_top = optim.Adam(self.top_model.parameters(), lr=lr)
        
        # 损失函数
        self.criterion = nn.BCELoss()
        
    def train(self):
        for epoch in range(self.epochs):
            # 前向传播
            # 1. 各参与方计算自己的嵌入向量
            embedding_A = self.party_A_model(self.party_A_data)
            embedding_B = self.party_B_model(self.party_B_data)
            
            # 2. 将嵌入向量发送给可信的第三方
            combined_embedding = torch.cat((embedding_A, embedding_B), dim=1)
            
            # 3. 第三方使用顶层模型进行预测
            predictions = self.top_model(combined_embedding).squeeze()
            
            # 4. 计算损失
            loss = self.criterion(predictions, self.labels)
            
            # 反向传播和优化
            self.optimizer_A.zero_grad()
            self.optimizer_B.zero_grad()
            self.optimizer_top.zero_grad()
            
            loss.backward()
            
            self.optimizer_A.step()
            self.optimizer_B.step()
            self.optimizer_top.step()
            
            # 每10个epoch评估一次模型
            if (epoch + 1) % 10 == 0:
                accuracy = self.evaluate()
                print(f"Epoch {epoch+1}/{self.epochs}, Loss: {loss.item():.4f}, Accuracy: {accuracy:.4f}")
    
    def evaluate(self):
        self.party_A_model.eval()
        self.party_B_model.eval()
        self.top_model.eval()
        
        with torch.no_grad():
            embedding_A = self.party_A_model(self.test_A_data)
            embedding_B = self.party_B_model(self.test_B_data)
            
            combined_embedding = torch.cat((embedding_A, embedding_B), dim=1)
            predictions = self.top_model(combined_embedding).squeeze()
            
            predicted_labels = (predictions > 0.5).float()
            accuracy = (predicted_labels == self.test_labels).float().mean()
            
        self.party_A_model.train()
        self.party_B_model.train()
        self.top_model.train()
        
        return accuracy.item()

# 6. 初始化模型
embedding_dim = 8
party_A_model = PartyModel(input_dim=X_train_A.shape[1], embedding_dim=embedding_dim)
party_B_model = PartyModel(input_dim=X_train_B.shape[1], embedding_dim=embedding_dim)
top_model = TopModel(input_dim=embedding_dim*2)  # 两个参与方的嵌入向量拼接

# 7. 创建并执行纵向联邦学习
vfl = VerticalFederatedLearning(
    party_A_model=party_A_model,
    party_B_model=party_B_model,
    top_model=top_model,
    party_A_data=X_train_A_tensor,
    party_B_data=X_train_B_tensor,
    labels=y_train_tensor,
    test_A_data=X_test_A_tensor,
    test_B_data=X_test_B_tensor,
    test_labels=y_test_tensor,
    epochs=100,
    lr=0.01
)

print("开始纵向联邦学习训练...")
vfl.train()

# 8. 评估最终模型
final_accuracy = vfl.evaluate()
print(f"\n最终模型准确率: {final_accuracy:.4f}")

2.4 横向与纵向联邦学习的数据划分策略流程图

横向联邦学习数据划分:
+-------------+    +-------------+    +-------------+
| 客户端 A     |    | 客户端 B     |    | 客户端 C     |
+-------------+    +-------------+    +-------------+
| 样本 1-1000  |    | 样本 1001-   |    | 样本 2001-  |
| 特征 1-100   |    | 2000        |    | 3000        |
|             |     | 特征 1-100  |    | 特征 1-100   |
+-------------+    +-------------+    +-------------+
        |                 |                 |
        v                 v                 v
    +-------------------------------------------+
    |              中央服务器                     |
    | (聚合模型参数,但不获取原始数据)               |
    +-------------------------------------------+

纵向联邦学习数据划分:
+-------------+    +-------------+    +-------------+
| 客户端 A     |    | 客户端 B     |    | 客户端 C     |
+-------------+    +-------------+    +-------------+
| 样本 1-3000  |    | 样本 1-3000 |    | 样本 1-3000  |
| 特征 1-30    |    | 特征 31-60  |    | 特征 61-100  |
+-------------+    +-------------+    +-------------+
        |                 |                 |
        v                 v                 v
    +-------------------------------------------+
    |              第三方协调者                   |
    | (协调训练过程,但不获取原始特征)               |
    +-------------------------------------------+

3. 模型聚合的安全协议

联邦学习中的模型聚合是一个关键步骤,需要安全协议来保护参与方的隐私。

3.1 常见的安全聚合协议

下面是几种常见的安全聚合协议比较:

协议名称 核心思想 安全保证 通信开销 计算开销 适用场景
安全聚合 (SecAgg) 使用秘密共享和掩码 隐藏个体更新 中等 中等 水平联邦
同态加密 在加密域进行计算 高度安全 水平和垂直联邦
多方安全计算 (MPC) 分布式计算 高度安全 垂直联邦
SMPC+DP 结合安全多方计算和差分隐私 非常高 高敏感场景
TEE (可信执行环境) 硬件安全区 高度安全 需特殊硬件

3.2 安全聚合协议示例代码

以下是一个使用PySyft实现基本安全聚合的示例:

import torch
import syft as sy

# 初始化PySyft环境
hook = sy.TorchHook(torch)

# 创建模拟联邦学习的虚拟工作机
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")
charlie = sy.VirtualWorker(hook, id="charlie")
secure_worker = sy.VirtualWorker(hook, id="secure_aggregator")

# 创建一个简单的模型
model = torch.nn.Linear(10, 1)

# 发送模型副本给每个参与方
model_alice = model.copy().send(alice)
model_bob = model.copy().send(bob)
model_charlie = model.copy().send(charlie)

# 模拟本地数据和训练
# 在实际应用中,每个参与方会有自己的数据和训练循环
data_alice = torch.randn(100, 10).send(alice)
labels_alice = torch.randn(100, 1).send(alice)

# 模拟本地数据和训练
data_bob = torch.randn(100, 10).send(bob)
labels_bob = torch.randn(100, 1).send(bob)

data_charlie = torch.randn(100, 10).send(charlie)
labels_charlie = torch.randn(100, 1).send(charlie)

# 简单的本地训练过程
optimizer_alice = torch.optim.SGD(params=model_alice.parameters(), lr=0.1)
optimizer_bob = torch.optim.SGD(params=model_bob.parameters(), lr=0.1)
optimizer_charlie = torch.optim.SGD(params=model_charlie.parameters(), lr=0.1)

# 执行一些训练步骤
for i in range(5):  # 模拟几个本地训练步骤
    # Alice的训练
    pred_alice = model_alice(data_alice)
    loss_alice = ((pred_alice - labels_alice) ** 2).sum()
    loss_alice.backward()
    optimizer_alice.step()
    optimizer_alice.zero_grad()
    
    # Bob的训练
    pred_bob = model_bob(data_bob)
    loss_bob = ((pred_bob - labels_bob) ** 2).sum()
    loss_bob.backward()
    optimizer_bob.step()
    optimizer_bob.zero_grad()
    
    # Charlie的训练
    pred_charlie = model_charlie(data_charlie)
    loss_charlie = ((pred_charlie - labels_charlie) ** 2).sum()
    loss_charlie.backward()
    optimizer_charlie.step()
    optimizer_charlie.zero_grad()

# 现在实现安全聚合
# 1. 每个参与方加密其模型更新
def get_model_params(model):
    """获取模型参数"""
    return [param.data for param in model.parameters()]

# 获取加密后的模型参数
encrypted_params_alice = [param.copy().encrypt(protocol="frac").send(secure_worker) 
                         for param in get_model_params(model_alice)]
encrypted_params_bob = [param.copy().encrypt(protocol="frac").send(secure_worker) 
                       for param in get_model_params(model_bob)]
encrypted_params_charlie = [param.copy().encrypt(protocol="frac").send(secure_worker) 
                          for param in get_model_params(model_charlie)]

# 2. 在安全工作机上进行加密聚合
secure_aggregated_params = []
for p_a, p_b, p_c in zip(encrypted_params_alice, encrypted_params_bob, encrypted_params_charlie):
    # 聚合模型参数(这里简单地取平均值)
    aggregated = (p_a + p_b + p_c) / 3
    secure_aggregated_params.append(aggregated)

# 3. 解密聚合后的参数并更新全局模型
decrypted_params = [param.get().decrypt() for param in secure_aggregated_params]

# 更新全局模型
for param, new_param in zip(model.parameters(), decrypted_params):
    param.data.copy_(new_param)

print("已完成安全聚合,更新了全局模型")

# 检查模型是否成功更新
print("全局模型参数示例:")
for name, param in model.named_parameters():
    print(f"{name}: {param.data[:2]}")

3.3 使用同态加密的安全聚合示例

同态加密允许在不解密的情况下对加密数据进行计算,是保护联邦学习隐私的强大工具。下面是一个使用TenSEAL库(基于CKKS同态加密方案)实现的安全聚合示例:

import torch
import tenseal as ts
import numpy as np

# 创建模拟数据
def create_client_data(num_clients, input_dim, num_samples):
    client_data = []
    for _ in range(num_clients):
        X = np.random.randn(num_samples, input_dim).astype(np.float32)
        y = (np.random.rand(num_samples) > 0.5).astype(np.float32)
        client_data.append((X, y))
    return client_data

# 模型定义
class LinearModel:
    def __init__(self, input_dim):
        self.weights = np.random.randn(input_dim).astype(np.float32)
        self.bias = np.random.randn(1).astype(np.float32)
    
    def predict(self, X):
        return np.dot(X, self.weights) + self.bias
    
    def train(self, X, y, learning_rate=0.01, epochs=5):
        for _ in range(epochs):
            predictions = self.predict(X)
            errors = predictions - y
            
            # 更新权重和偏置
            gradient_weights = np.dot(X.T, errors) / len(y)
            gradient_bias = np.mean(errors)
            
            self.weights -= learning_rate * gradient_weights
            self.bias -= learning_rate * gradient_bias
    
    def get_params(self):
        return self.weights, self.bias
    
    def set_params(self, weights, bias):
        self.weights = weights
        self.bias = bias

# 初始化TenSEAL上下文 (同态加密)
def create_tenseal_context():
    # 创建加密上下文
    context = ts.context(
        ts.SCHEME_TYPE.CKKS,
        poly_modulus_degree=8192,
        coeff_mod_bit_sizes=[60, 40, 40, 60]
    )
    context.global_scale = 2**40
    context.generate_galois_keys()
    return context

# 联邦学习与同态加密实现
def federated_learning_with_he(client_data, input_dim, num_rounds=3, local_epochs=2):
    # 创建全局模型
    global_model = LinearModel(input_dim)
    
    # 创建加密上下文
    context = create_tenseal_context()
    
    print("开始联邦学习训练(使用同态加密)...")
    
    for round_num in range(num_rounds):
        print(f"轮次 {round_num+1}/{num_rounds}")
        
        # 本地训练
        local_models = []
        for client_id, (X, y) in enumerate(client_data):
            # 复制全局模型
            local_model = LinearModel(input_dim)
            global_weights, global_bias = global_model.get_params()
            local_model.set_params(global_weights.copy(), global_bias.copy())
            
            # 进行本地训练
            local_model.train(X, y, epochs=local_epochs)
            local_models.append(local_model)
        
        # 加密并聚合模型更新
        encrypted_weight_updates = []
        encrypted_bias_updates = []
        
        for local_model in local_models:
            # 计算模型更新(与全局模型相比)
            local_weights, local_bias = local_model.get_params()
            global_weights, global_bias = global_model.get_params()
            
            weight_update = local_weights - global_weights
            bias_update = local_bias - global_bias
            
            # 加密模型更新
            encrypted_weights = ts.ckks_vector(context, weight_update)
            encrypted_bias = ts.ckks_vector(context, bias_update)
            
            encrypted_weight_updates.append(encrypted_weights)
            encrypted_bias_updates.append(encrypted_bias)
        
        # 同态加密下的聚合
        aggregated_weights = encrypted_weight_updates[0]
        for enc_weights in encrypted_weight_updates[1:]:
            aggregated_weights += enc_weights
        
        aggregated_bias = encrypted_bias_updates[0]
        for enc_bias in encrypted_bias_updates[1:]:
            aggregated_bias += enc_bias
        
        # 计算平均值
        num_clients = len(client_data)
        aggregated_weights *= (1.0 / num_clients)
        aggregated_bias *= (1.0 / num_clients)
        
        # 解密聚合结果
        decrypted_weight_update = np.array(aggregated_weights.decrypt())
        decrypted_bias_update = np.array(aggregated_bias.decrypt())
        
        # 更新全局模型
        global_weights, global_bias = global_model.get_params()
        global_model.set_params(
            global_weights + decrypted_weight_update,
            global_bias + decrypted_bias_update
        )
        
    return global_model

# 运行联邦学习
num_clients = 3
input_dim = 10
num_samples = 100
client_data = create_client_data(num_clients, input_dim, num_samples)

final_model = federated_learning_with_he(
    client_data, 
    input_dim, 
    num_rounds=3, 
    local_epochs=2
)

# 评估模型
def evaluate_model(model, test_data):
    X_test, y_test = test_data
    predictions = model.predict(X_test)
    binary_predictions = (predictions > 0.5).astype(np.float32)
    accuracy = np.mean(binary_predictions == y_test)
    return accuracy

# 创建测试数据
X_test = np.random.randn(50, input_dim).astype(np.float32)
y_test = (np.random.rand(50) > 0.5).astype(np.float32)
test_data = (X_test, y_test)

accuracy = evaluate_model(final_model, test_data)
print(f"\n最终模型准确率: {accuracy:.4f}")

3.4 多方安全计算示例

多方安全计算(MPC)是一种密码学技术,允许多个参与方共同计算一个函数,而不暴露各自的输入。下面是一个使用PySyft的MPC实现示例:

import torch
import syft as sy

# 初始化PySyft
hook = sy.TorchHook(torch)

# 创建两个虚拟工作机
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")

# 创建安全工作机进行多方计算
secure_worker = sy.VirtualWorker(hook, id="secure_worker")

# 1. 准备数据 (每个参与方都有自己的私有数据)
# Alice的数据
x_alice = torch.tensor([0.1, 0.2, 0.3]).send(alice)
# Bob的数据
x_bob = torch.tensor([0.7, 0.8, 0.9]).send(bob)

# 2. 使用加法共享加密方案,将数据秘密共享给两个参与方
# 将Alice的数据加密并共享
encrypted_x_alice = x_alice.share(alice, bob, crypto_provider=secure_worker)
# 将Bob的数据加密并共享
encrypted_x_bob = x_bob.share(alice, bob, crypto_provider=secure_worker)

# 3. 在加密状态下执行计算
# 计算两个向量的点积(安全地)
encrypted_result = (encrypted_x_alice * encrypted_x_bob).sum()

# 4. 解密结果
decrypted_result = encrypted_result.get().float_precision()

print(f"安全计算的结果: {decrypted_result}")

# 5. 验证结果(与明文计算结果对比)
plaintext_result = (x_alice.get() * x_bob.get()).sum()
print(f"明文计算的结果: {plaintext_result}")
print(f"两种计算方法的误差: {abs(decrypted_result - plaintext_result)}")

3.5 模型聚合安全协议的流程图

+---------------------+   +---------------------+   +---------------------+
| 客户端 A             |   | 客户端 B             |   | 客户端 C             |
+---------------------+   +---------------------+   +---------------------+
| 1. 接收全局模型       |   | 1. 接收全局模型       |   | 1. 接收全局模型       |
| 2. 使用本地数据训练    |   | 2. 使用本地数据训练    |   | 2. 使用本地数据训练   |
| 3. 计算模型更新       |   | 3. 计算模型更新       |   | 3. 计算模型更新       |
+---------------------+   +---------------------+   +---------------------+
         |                         |                         |
         |                         |                         |
         v                         v                         v
+-----------------------------------------------------------------------+
|                           安全聚合协议                                  |
+-----------------------------------------------------------------------+
| 1. 加密阶段: 各客户端加密其模型更新                                        |
| 2. 掩码阶段: 各客户端添加随机掩码,相互抵消的方式确保掩码和为零                 |
| 3. 聚合阶段: 服务器聚合加密的更新,掩码相互抵消                              |
| 4. 解密阶段: 协议确保只有最终聚合结果被解密,各客户端的输入保持隐私              |
+-----------------------------------------------------------------------+
                                 |
                                 v
+-----------------------------------------------------------------------+
|                             服务器                                     |
+-----------------------------------------------------------------------+
| 1. 接收聚合后的模型更新                                                   |
| 2. 应用到全局模型                                                        |
| 3. 分发新的全局模型                                                      |
+-----------------------------------------------------------------------+

4. 联邦学习系统的综合实例:联邦学习图像分类器

为了将今天所学的内容综合起来,让我们实现一个具有差分隐私保护的横向联邦学习图像分类系统:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
from opacus import PrivacyEngine
import copy
import matplotlib.pyplot as plt

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 1. 数据准备
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# 加载MNIST数据集
mnist_train = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
mnist_test = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# 模拟3个不同的客户端,将数据集水平划分
def split_data(dataset, num_clients):
    """水平划分数据集给不同客户端"""
    num_items_per_client = len(dataset) // num_clients
    client_datasets = []
    
    for i in range(num_clients):
        start_idx = i * num_items_per_client
        end_idx = (i + 1) * num_items_per_client if i < num_clients - 1 else len(dataset)
        indices = range(start_idx, end_idx)
        client_datasets.append(torch.utils.data.Subset(dataset, indices))
    
    return client_datasets

num_clients = 3
client_datasets = split_data(mnist_train, num_clients)
test_loader = torch.utils.data.DataLoader(mnist_test, batch_size=64, shuffle=False)

# 2. 模型定义
class MnistCNN(nn.Module):
    def __init__(self):
        super(MnistCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)
        self.dropout = nn.Dropout2d()
        self.pool = nn.MaxPool2d(2)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = self.relu(self.pool(self.conv1(x)))
        x = self.relu(self.pool(self.dropout(self.conv2(x))))
        x = x.view(-1, 320)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# 3. 联邦学习实现
class FederatedLearningWithDP:
    def __init__(self, global_model, client_datasets, test_loader, 
                 num_rounds=5, local_epochs=1, learning_rate=0.01, 
                 noise_multiplier=1.0, max_grad_norm=1.0, delta=1e-5):
        self.global_model = global_model
        self.client_datasets = client_datasets
        self.test_loader = test_loader
        self.num_rounds = num_rounds
        self.local_epochs = local_epochs
        self.learning_rate = learning_rate
        self.noise_multiplier = noise_multiplier
        self.max_grad_norm = max_grad_norm
        self.delta = delta
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        
        self.global_model.to(self.device)
        print(f"使用设备: {self.device}")
        
    def train_client(self, client_id, global_model_state):
        """在单个客户端上训练模型(带差分隐私)"""
        # 复制全局模型
        local_model = MnistCNN().to(self.device)
        local_model.load_state_dict(global_model_state)
        local_model.train()
        
        # 准备数据
        train_loader = torch.utils.data.DataLoader(
            self.client_datasets[client_id], batch_size=32, shuffle=True
        )
        
        # 设置优化器
        optimizer = optim.SGD(local_model.parameters(), lr=self.learning_rate)
        criterion = nn.CrossEntropyLoss()
        
        # 添加差分隐私
        privacy_engine = PrivacyEngine()
        model, optimizer, train_loader = privacy_engine.make_private(
            module=local_model,
            optimizer=optimizer,
            data_loader=train_loader,
            noise_multiplier=self.noise_multiplier,
            max_grad_norm=self.max_grad_norm,
        )
        
        # 训练模型
        for epoch in range(self.local_epochs):
            for data, target in train_loader:
                data, target = data.to(self.device), target.to(self.device)
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
        
        # 计算隐私预算
        epsilon = privacy_engine.get_epsilon(delta=self.delta)
        print(f"  客户端 {client_id+1} 的隐私预算: ε = {epsilon:.2f}")
        
        return local_model.state_dict()
    
    def aggregate_models(self, model_states):
        """使用FedAvg聚合客户端模型"""
        # 初始化一个空字典来存储聚合后的参数
        global_dict = copy.deepcopy(model_states[0])
        
        # 对所有模型参数求平均
        for key in global_dict.keys():
            global_dict[key] = torch.stack([states[key] for states in model_states]).mean(0)
        
        return global_dict
    
    def evaluate(self, model):
        """评估模型性能"""
        model.eval()
        test_loss = 0
        correct = 0
        criterion = nn.CrossEntropyLoss()
        
        with torch.no_grad():
            for data, target in self.test_loader:
                data, target = data.to(self.device), target.to(self.device)
                output = model(data)
                test_loss += criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
        
        test_loss /= len(self.test_loader.dataset)
        accuracy = 100. * correct / len(self.test_loader.dataset)
        
        return test_loss, accuracy
    
    def train(self):
        """执行联邦学习过程"""
        accuracies = []
        
        for round_num in range(self.num_rounds):
            print(f"联邦学习轮次 {round_num+1}/{self.num_rounds}")
            
            # 收集每个客户端的模型
            client_models = []
            for client_id in range(len(self.client_datasets)):
                print(f"  训练客户端 {client_id+1}")
                client_model = self.train_client(client_id, copy.deepcopy(self.global_model.state_dict()))
                client_models.append(client_model)
            
            # 聚合模型
            global_dict = self.aggregate_models(client_models)
            self.global_model.load_state_dict(global_dict)
            
            # 评估全局模型
            test_loss, accuracy = self.evaluate(self.global_model)
            accuracies.append(accuracy)
            print(f"  轮次 {round_num+1} 完成, 测试损失: {test_loss:.4f}, 准确率: {accuracy:.2f}%")
        
        return self.global_model, accuracies

# 4. 执行联邦学习
global_model = MnistCNN()
fl_with_dp = FederatedLearningWithDP(
    global_model=global_model,
    client_datasets=client_datasets,
    test_loader=test_loader,
    num_rounds=10,
    local_epochs=1,
    learning_rate=0.01,
    noise_multiplier=1.0,  # 控制添加噪声的程度
    max_grad_norm=1.0
)

print("开始联邦学习训练(带差分隐私保护)...")
final_model, accuracies = fl_with_dp.train()

# 5. 可视化结果
plt.figure(figsize=(10, 5))
plt.plot(range(1, len(accuracies) + 1), accuracies, marker='o')
plt.xlabel('联邦学习轮次')
plt.ylabel('测试准确率 (%)')
plt.title('联邦学习过程中的模型准确率')
plt.grid(True)
plt.savefig('federated_learning_accuracy.png')
plt.close()

print(f"\n联邦学习完成! 最终模型准确率: {accuracies[-1]:.2f}%")

5. 总结与实践应用

今天我们深入学习了联邦学习系统的三个核心内容:差分隐私机制、横向/纵向联邦学习的数据划分策略,以及模型聚合的安全协议。

5.1 关键要点总结

  1. 差分隐私是一种数学框架,通过添加经过校准的噪声来保护个体隐私,同时允许对数据集进行有用的分析。

    • 常用机制包括拉普拉斯机制、高斯机制和指数机制
    • 隐私预算(ε)控制着隐私保护强度与实用性之间的权衡
  2. 横向联邦学习适用于参与方拥有相同特征空间但不同样本的情况。

    • 实现相对简单,通常使用FedAvg等聚合算法
    • 主要保护样本隐私
  3. 纵向联邦学习适用于参与方拥有相同样本但不同特征的情况。

    • 实现较为复杂,需要安全的实体对齐和协作训练
    • 主要保护特征隐私
  4. 安全聚合协议确保在聚合模型参数时不会泄露个体参与方的信息。

    • 常见方法包括安全聚合(SecAgg)、同态加密和多方安全计算(MPC)
    • 不同方法在安全性、通信开销和计算开销上各有权衡

5.2 实际应用场景

联邦学习系统的典型应用场景包括:

  • 医疗健康: 不同医院可以在不共享病人敏感数据的情况下,协作训练疾病诊断模型
  • 金融科技: 银行、保险和信贷机构可以共同构建风险评估模型,同时保护客户隐私
  • 智能手机: Google已经在Android设备上应用联邦学习来训练键盘预测和语音识别模型
  • 智能交通: 不同车企和交通部门可以共享数据来训练交通预测模型
  • 跨企业协作: 不同企业可以在不泄露商业机密的情况下共同训练模型

5.3 学习资源推荐

要深入学习联邦学习系统,以下资源非常有用:

  1. 理论学习:

    • 《联邦学习》(杨强等著)
    • “Communication-Efficient Learning of Deep Networks from Decentralized Data” (联邦学习原始论文)
    • “Deep Learning with Differential Privacy” (差分隐私在深度学习中的应用)
  2. 编程资源:

    • PySyft: 用于隐私保护机器学习的Python库
    • Flower: 专注于联邦学习的开源框架
    • TensorFlow Federated: Google的联邦学习框架
    • Opacus: Facebook的PyTorch差分隐私库
  3. 实践项目:

    • 构建一个具有差分隐私保护的联邦学习系统
    • 尝试实现横向和纵向联邦学习的对比实验
    • 探索不同安全聚合协议的性能比较

清华大学全五版的《DeepSeek教程》完整的文档需要的朋友,关注我私信:deepseek 即可获得。

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


网站公告

今日签到

点亮在社区的每一天
去签到