📋 文章概览
在互联网基础设施日益复杂的今天,单节点代理服务早已无法满足企业级应用的需求。本文将深入探讨如何构建一个生产级分布式IP代理集群系统,涵盖从架构设计、智能调度、性能优化到运维监控的完整解决方案。
通过本文,你将掌握:
- 🏗️ 分布式集群架构设计原理
- 🤖 AI驱动的智能调度算法
- ⚡ 高性能优化技术实践
- 🔧 自动化运维和监控体系
- 🌍 全球分布式部署策略
🏗️ 分布式集群架构设计
1.1 系统总体架构
分布式IP代理集群采用分层架构设计,确保高可用性和横向扩展能力:
1.2 核心组件设计
1.2.1 代理服务节点
import asyncio
import aiohttp
import time
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
import json
import logging
import uuid
from datetime import datetime
import hashlib
class ProxyNodeStatus(Enum):
"""代理节点状态"""
INITIALIZING = "initializing"
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
MAINTENANCE = "maintenance"
OFFLINE = "offline"
class ProxyType(Enum):
"""代理类型"""
HTTP = "http"
HTTPS = "https"
SOCKS4 = "socks4"
SOCKS5 = "socks5"
@dataclass
class ProxyNodeConfig:
"""代理节点配置"""
node_id: str
host: str
port: int
proxy_type: ProxyType
region: str
datacenter: str
max_connections: int = 1000
max_bandwidth_mbps: int = 100
health_check_interval: int = 30
timeout_seconds: int = 30
@dataclass
class ProxyNodeMetrics:
"""代理节点指标"""
active_connections: int = 0
total_requests: int = 0
success_requests: int = 0
failed_requests: int = 0
average_response_time: float = 0.0
bandwidth_usage_mbps: float = 0.0
cpu_usage_percent: float = 0.0
memory_usage_percent: float = 0.0
last_health_check: datetime = field(default_factory=datetime.now)
uptime_seconds: int = 0
class DistributedProxyNode:
"""分布式代理节点"""
def __init__(self, config: ProxyNodeConfig):
self.config = config
self.status = ProxyNodeStatus.INITIALIZING
self.metrics = ProxyNodeMetrics()
# 连接池管理
self.connection_pool = {}
self.active_sessions = {}
# 健康检查
self.health_checker = None
self.last_health_check = time.time()
# 性能监控
self.performance_monitor = ProxyNodePerformanceMonitor()
# 请求统计
self.request_stats = {
'total': 0,
'success': 0,
'failed': 0,
'avg_response_time': 0.0
}
self.start_time = time.time()
async def initialize(self):
"""初始化代理节点"""
try:
print(f"初始化代理节点 {self.config.node_id}")
# 创建连接池
connector = aiohttp.TCPConnector(
limit=self.config.max_connections,
limit_per_host=100,
ttl_dns_cache=300,
use_dns_cache=True,
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=self.config.timeout_seconds)
)
# 启动健康检查
self.health_checker = asyncio.create_task(self._health_check_loop())
# 启动性能监控
await self.performance_monitor.start(self)
self.status = ProxyNodeStatus.HEALTHY
print(f"代理节点 {self.config.node_id} 初始化完成")
except Exception as e:
print(f"节点初始化失败: {e}")
self.status = ProxyNodeStatus.OFFLINE
raise
async def handle_proxy_request(self, request_data: Dict) -> Dict[str, Any]:
"""处理代理请求"""
start_time = time.time()
request_id = str(uuid.uuid4())
try:
# 记录请求开始
self.metrics.active_connections += 1
self.metrics.total_requests += 1
# 构建代理请求
target_url = request_data['url']
method = request_data.get('method', 'GET')
headers = request_data.get('headers', {})
data = request_data.get('data')
# 添加代理标识
headers['X-Proxy-Node'] = self.config.node_id
headers['X-Request-ID'] = request_id
# 发送请求
async with self.session.request(
method=method,
url=target_url,
headers=headers,
data=data
) as response:
content = await response.read()
response_data = {
'status_code': response.status,
'headers': dict(response.headers),
'content': content.decode('utf-8', errors='ignore'),
'request_id': request_id,
'node_id': self.config.node_id,
'response_time': time.time() - start_time
}
# 更新成功统计
self.metrics.success_requests += 1
self._update_response_time(time.time() - start_time)
return {
'success': True,
'data': response_data
}
except Exception as e:
# 更新失败统计
self.metrics.failed_requests += 1
return {
'success': False,
'error': str(e),
'request_id': request_id,
'node_id': self.config.node_id,
'response_time': time.time() - start_time
}
finally:
# 更新连接统计
self.metrics.active_connections -= 1
async def _health_check_loop(self):
"""健康检查循环"""
while True:
try:
await asyncio.sleep(self.config.health_check_interval)
# 执行健康检查
is_healthy = await self._perform_health_check()
if is_healthy:
if self.status == ProxyNodeStatus.UNHEALTHY:
self.status = ProxyNodeStatus.HEALTHY
print(f"节点 {self.config.node_id} 恢复健康")
else:
if self.status == ProxyNodeStatus.HEALTHY:
self.status = ProxyNodeStatus.UNHEALTHY
print(f"节点 {self.config.node_id} 变为不健康")
self.metrics.last_health_check = datetime.now()
self.last_health_check = time.time()
except Exception as e:
print(f"健康检查失败: {e}")
self.status = ProxyNodeStatus.UNHEALTHY
async def _perform_health_check(self) -> bool:
"""执行健康检查"""
try:
# 检查网络连通性
test_url = "https://httpbin.org/ip"
start_time = time.time()
async with self.session.get(test_url) as response:
if response.status == 200:
response_time = time.time() - start_time
# 检查响应时间
if response_time > self.config.timeout_seconds:
return False
# 检查连接数
if self.metrics.active_connections > self.config.max_connections * 0.9:
return False
# 检查错误率
if self.metrics.total_requests > 0:
error_rate = self.metrics.failed_requests / self.metrics.total_requests
if error_rate > 0.1: # 10%错误率阈值
return False
return True
else:
return False
except Exception:
return False
def _update_response_time(self, response_time: float):
"""更新响应时间统计"""
if self.metrics.total_requests == 1:
self.metrics.average_response_time = response_time
else:
# 计算滑动平均
alpha = 0.1 # 平滑因子
self.metrics.average_response_time = (
alpha * response_time +
(1 - alpha) * self.metrics.average_response_time
)
def get_node_info(self) -> Dict[str, Any]:
"""获取节点信息"""
self.metrics.uptime_seconds = int(time.time() - self.start_time)
return {
'node_id': self.config.node_id,
'status': self.status.value,
'config': {
'host': self.config.host,
'port': self.config.port,
'proxy_type': self.config.proxy_type.value,
'region': self.config.region,
'datacenter': self.config.datacenter,
'max_connections': self.config.max_connections,
'max_bandwidth_mbps': self.config.max_bandwidth_mbps
},
'metrics': {
'active_connections': self.metrics.active_connections,
'total_requests': self.metrics.total_requests,
'success_requests': self.metrics.success_requests,
'failed_requests': self.metrics.failed_requests,
'average_response_time': self.metrics.average_response_time,
'bandwidth_usage_mbps': self.metrics.bandwidth_usage_mbps,
'cpu_usage_percent': self.metrics.cpu_usage_percent,
'memory_usage_percent': self.metrics.memory_usage_percent,
'uptime_seconds': self.metrics.uptime_seconds,
'success_rate': (
self.metrics.success_requests / max(self.metrics.total_requests, 1)
)
},
'last_health_check': self.metrics.last_health_check.isoformat()
}
async def shutdown(self):
"""关闭代理节点"""
try:
print(f"关闭代理节点 {self.config.node_id}")
# 停止健康检查
if self.health_checker:
self.health_checker.cancel()
# 停止性能监控
await self.performance_monitor.stop()
# 关闭会话
if hasattr(self, 'session'):
await self.session.close()
self.status = ProxyNodeStatus.OFFLINE
print(f"代理节点 {self.config.node_id} 已关闭")
except Exception as e:
print(f"关闭节点失败: {e}")
class ProxyNodePerformanceMonitor:
"""代理节点性能监控"""
def __init__(self):
self.monitoring_task = None
self.node = None
async def start(self, node: DistributedProxyNode):
"""启动性能监控"""
self.node = node
self.monitoring_task = asyncio.create_task(self._monitor_loop())
async def _monitor_loop(self):
"""监控循环"""
while True:
try:
await asyncio.sleep(10) # 每10秒监控一次
# 获取系统指标
await self._collect_system_metrics()
except Exception as e:
print(f"性能监控错误: {e}")
async def _collect_system_metrics(self):
"""收集系统指标"""
try:
import psutil
# CPU使用率
self.node.metrics.cpu_usage_percent = psutil.cpu_percent(interval=1)
# 内存使用率
memory = psutil.virtual_memory()
self.node.metrics.memory_usage_percent = memory.percent
# 网络带宽(简化版)
# 实际实现需要根据具体网络接口计算
self.node.metrics.bandwidth_usage_mbps = 0.0
except ImportError:
# 如果没有psutil,使用模拟数据
self.node.metrics.cpu_usage_percent = 20.0
self.node.metrics.memory_usage_percent = 60.0
self.node.metrics.bandwidth_usage_mbps = 10.0
async def stop(self):
"""停止性能监控"""
if self.monitoring_task:
self.monitoring_task.cancel()
# 代理节点使用示例
async def proxy_node_example():
"""代理节点示例"""
# 创建节点配置
config = ProxyNodeConfig(
node_id="proxy-node-01",
host="10.0.1.100",
port=8080,
proxy_type=ProxyType.HTTP,
region="us-east-1",
datacenter="dc-01",
max_connections=500,
max_bandwidth_mbps=50,
health_check_interval=30
)
# 创建代理节点
proxy_node = DistributedProxyNode(config)
try:
# 初始化节点
await proxy_node.initialize()
# 模拟处理请求
test_requests = [
{
'url': 'https://httpbin.org/get',
'method': 'GET',
'headers': {'User-Agent': 'ProxyNode/1.0'}
},
{
'url': 'https://httpbin.org/post',
'method': 'POST',
'headers': {'Content-Type': 'application/json'},
'data': json.dumps({'test': 'data'})
}
]
# 并发处理请求
tasks = [
proxy_node.handle_proxy_request(request)
for request in test_requests
]
results = await asyncio.gather(*tasks)
# 输出结果
for i, result in enumerate(results):
print(f"请求 {i+1} 结果: {result['success']}")
if result['success']:
print(f" 响应时间: {result['data']['response_time']:.3f}s")
print(f" 状态码: {result['data']['status_code']}")
else:
print(f" 错误: {result['error']}")
# 显示节点信息
node_info = proxy_node.get_node_info()
print(f"\n节点信息:")
print(f"节点ID: {node_info['node_id']}")
print(f"状态: {node_info['status']}")
print(f"总请求数: {node_info['metrics']['total_requests']}")
print(f"成功率: {node_info['metrics']['success_rate']:.2%}")
print(f"平均响应时间: {node_info['metrics']['average_response_time']:.3f}s")
# 运行一段时间
await asyncio.sleep(60)
finally:
await proxy_node.shutdown()
if __name__ == "__main__":
asyncio.run(proxy_node_example())
1.2.2 集群管理服务
集群管理服务负责节点注册、健康监控和状态同步:
import asyncio
import aioredis
import json
from typing import Dict, List, Set
from dataclasses import asdict
from datetime import datetime, timedelta
import logging
class ProxyClusterManager:
"""代理集群管理器"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis_client = None
# 集群状态
self.nodes: Dict[str, Dict] = {}
self.regions: Dict[str, List[str]] = {}
self.datacenters: Dict[str, List[str]] = {}
# 监控任务
self.health_monitor_task = None
self.cluster_sync_task = None
# 事件回调
self.event_callbacks = {
'node_added': [],
'node_removed': [],
'node_status_changed': [],
'cluster_rebalanced': []
}
async def start(self):
"""启动集群管理器"""
try:
# 连接Redis
self.redis_client = await aioredis.from_url(self.redis_url)
# 加载现有节点
await self._load_cluster_state()
# 启动监控任务
self.health_monitor_task = asyncio.create_task(self._health_monitor_loop())
self.cluster_sync_task = asyncio.create_task(self._cluster_sync_loop())
print("代理集群管理器启动完成")
except Exception as e:
print(f"集群管理器启动失败: {e}")
raise
async def register_node(self, node_info: Dict) -> bool:
"""注册代理节点"""
try:
node_id = node_info['node_id']
region = node_info['config']['region']
datacenter = node_info['config']['datacenter']
# 存储节点信息
self.nodes[node_id] = {
**node_info,
'registered_at': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat()
}
# 更新区域和数据中心映射
if region not in self.regions:
self.regions[region] = []
if node_id not in self.regions[region]:
self.regions[region].append(node_id)
if datacenter not in self.datacenters:
self.datacenters[datacenter] = []
if node_id not in self.datacenters[datacenter]:
self.datacenters[datacenter].append(node_id)
# 持久化到Redis
await self.redis_client.hset(
"proxy_nodes",
node_id,
json.dumps(self.nodes[node_id])
)
# 触发事件
await self._trigger_event('node_added', {'node_id': node_id, 'node_info': node_info})
print(f"节点注册成功: {node_id}")
return True
except Exception as e:
print(f"节点注册失败: {e}")
return False
async def unregister_node(self, node_id: str) -> bool:
"""注销代理节点"""
try:
if node_id not in self.nodes:
return False
node_info = self.nodes[node_id]
region = node_info['config']['region']
datacenter = node_info['config']['datacenter']
# 从集群中移除
del self.nodes[node_id]
# 更新区域和数据中心映射
if region in self.regions and node_id in self.regions[region]:
self.regions[region].remove(node_id)
if not self.regions[region]:
del self.regions[region]
if datacenter in self.datacenters and node_id in self.datacenters[datacenter]:
self.datacenters[datacenter].remove(node_id)
if not self.datacenters[datacenter]:
del self.datacenters[datacenter]
# 从Redis中删除
await self.redis_client.hdel("proxy_nodes", node_id)
# 触发事件
await self._trigger_event('node_removed', {'node_id': node_id, 'node_info': node_info})
print(f"节点注销成功: {node_id}")
return True
except Exception as e:
print(f"节点注销失败: {e}")
return False
async def update_node_status(self, node_id: str, status_update: Dict) -> bool:
"""更新节点状态"""
try:
if node_id not in self.nodes:
return False
old_status = self.nodes[node_id].get('status')
# 更新节点信息
self.nodes[node_id].update(status_update)
self.nodes[node_id]['last_seen'] = datetime.now().isoformat()
# 持久化到Redis
await self.redis_client.hset(
"proxy_nodes",
node_id,
json.dumps(self.nodes[node_id])
)
new_status = self.nodes[node_id].get('status')
# 如果状态发生变化,触发事件
if old_status != new_status:
await self._trigger_event('node_status_changed', {
'node_id': node_id,
'old_status': old_status,
'new_status': new_status
})
return True
except Exception as e:
print(f"更新节点状态失败: {e}")
return False
async def get_healthy_nodes(self, region: str = None, datacenter: str = None) -> List[Dict]:
"""获取健康节点列表"""
healthy_nodes = []
for node_id, node_info in self.nodes.items():
if node_info.get('status') != 'healthy':
continue
# 区域过滤
if region and node_info['config']['region'] != region:
continue
# 数据中心过滤
if datacenter and node_info['config']['datacenter'] != datacenter:
continue
healthy_nodes.append(node_info)
return healthy_nodes
async def get_cluster_stats(self) -> Dict[str, Any]:
"""获取集群统计"""
total_nodes = len(self.nodes)
healthy_nodes = 0
unhealthy_nodes = 0
total_requests = 0
total_success = 0
for node_info in self.nodes.values():
status = node_info.get('status')
if status == 'healthy':
healthy_nodes += 1
else:
unhealthy_nodes += 1
metrics = node_info.get('metrics', {})
total_requests += metrics.get('total_requests', 0)
total_success += metrics.get('success_requests', 0)
return {
'total_nodes': total_nodes,
'healthy_nodes': healthy_nodes,
'unhealthy_nodes': unhealthy_nodes,
'regions': len(self.regions),
'datacenters': len(self.datacenters),
'total_requests': total_requests,
'total_success': total_success,
'success_rate': total_success / max(total_requests, 1),
'region_distribution': {region: len(nodes) for region, nodes in self.regions.items()},
'datacenter_distribution': {dc: len(nodes) for dc, nodes in self.datacenters.items()}
}
async def _load_cluster_state(self):
"""加载集群状态"""
try:
# 从Redis加载节点信息
nodes_data = await self.redis_client.hgetall("proxy_nodes")
for node_id, node_data in nodes_data.items():
node_info = json.loads(node_data.decode())
self.nodes[node_id.decode()] = node_info
# 重建区域和数据中心映射
region = node_info['config']['region']
datacenter = node_info['config']['datacenter']
if region not in self.regions:
self.regions[region] = []
if node_id.decode() not in self.regions[region]:
self.regions[region].append(node_id.decode())
if datacenter not in self.datacenters:
self.datacenters[datacenter] = []
if node_id.decode() not in self.datacenters[datacenter]:
self.datacenters[datacenter].append(node_id.decode())
print(f"加载了 {len(self.nodes)} 个代理节点")
except Exception as e:
print(f"加载集群状态失败: {e}")
async def _health_monitor_loop(self):
"""健康监控循环"""
while True:
try:
await asyncio.sleep(60) # 每分钟检查一次
current_time = datetime.now()
inactive_nodes = []
# 检查节点活跃状态
for node_id, node_info in self.nodes.items():
last_seen = datetime.fromisoformat(node_info['last_seen'])
# 如果超过5分钟没有更新,标记为不活跃
if current_time - last_seen > timedelta(minutes=5):
inactive_nodes.append(node_id)
# 处理不活跃节点
for node_id in inactive_nodes:
print(f"节点 {node_id} 长时间未响应,标记为离线")
await self.update_node_status(node_id, {'status': 'offline'})
except Exception as e:
print(f"健康监控错误: {e}")
async def _cluster_sync_loop(self):
"""集群同步循环"""
while True:
try:
await asyncio.sleep(30) # 每30秒同步一次
# 检查是否需要集群重平衡
should_rebalance = await self._should_rebalance_cluster()
if should_rebalance:
await self._rebalance_cluster()
except Exception as e:
print(f"集群同步错误: {e}")
async def _should_rebalance_cluster(self) -> bool:
"""检查是否需要集群重平衡"""
# 简化的重平衡逻辑
healthy_nodes = await self.get_healthy_nodes()
if len(healthy_nodes) == 0:
return False
# 检查负载分布是否均匀
total_requests = sum(
node['metrics'].get('total_requests', 0)
for node in healthy_nodes
)
if total_requests == 0:
return False
avg_requests = total_requests / len(healthy_nodes)
# 如果任何节点的请求数超过平均值的2倍,需要重平衡
for node in healthy_nodes:
node_requests = node['metrics'].get('total_requests', 0)
if node_requests > avg_requests * 2:
return True
return False
async def _rebalance_cluster(self):
"""执行集群重平衡"""
print("开始集群重平衡...")
# 触发重平衡事件
await self._trigger_event('cluster_rebalanced', {
'timestamp': datetime.now().isoformat(),
'reason': 'load_imbalance'
})
print("集群重平衡完成")
async def _trigger_event(self, event_type: str, event_data: Dict):
"""触发事件回调"""
if event_type in self.event_callbacks:
for callback in self.event_callbacks[event_type]:
try:
if asyncio.iscoroutinefunction(callback):
await callback(event_data)
else:
callback(event_data)
except Exception as e:
print(f"事件回调错误: {e}")
def add_event_callback(self, event_type: str, callback):
"""添加事件回调"""
if event_type in self.event_callbacks:
self.event_callbacks[event_type].append(callback)
async def shutdown(self):
"""关闭集群管理器"""
try:
# 停止监控任务
if self.health_monitor_task:
self.health_monitor_task.cancel()
if self.cluster_sync_task:
self.cluster_sync_task.cancel()
# 关闭Redis连接
if self.redis_client:
await self.redis_client.close()
print("代理集群管理器已关闭")
except Exception as e:
print(f"关闭集群管理器失败: {e}")
# 集群管理示例
async def cluster_management_example():
"""集群管理示例"""
# 创建集群管理器
cluster_manager = ProxyClusterManager()
# 添加事件回调
def on_node_added(event_data):
print(f"新节点加入: {event_data['node_id']}")
def on_node_status_changed(event_data):
print(f"节点状态变化: {event_data['node_id']} {event_data['old_status']} -> {event_data['new_status']}")
cluster_manager.add_event_callback('node_added', on_node_added)
cluster_manager.add_event_callback('node_status_changed', on_node_status_changed)
try:
await cluster_manager.start()
# 模拟注册节点
test_nodes = [
{
'node_id': f'proxy-node-{i:02d}',
'status': 'healthy',
'config': {
'host': f'10.0.1.{100+i}',
'port': 8080,
'proxy_type': 'http',
'region': 'us-east-1' if i < 3 else 'us-west-1',
'datacenter': f'dc-0{(i//2)+1}',
'max_connections': 1000,
'max_bandwidth_mbps': 100
},
'metrics': {
'active_connections': 10 + i * 5,
'total_requests': 1000 + i * 100,
'success_requests': 950 + i * 95,
'failed_requests': 50 + i * 5,
'average_response_time': 0.1 + i * 0.01,
'cpu_usage_percent': 20.0 + i * 5,
'memory_usage_percent': 60.0 + i * 2,
'uptime_seconds': 3600
}
}
for i in range(6)
]
# 注册节点
for node_info in test_nodes:
success = await cluster_manager.register_node(node_info)
print(f"注册节点 {node_info['node_id']}: {'成功' if success else '失败'}")
# 获取集群统计
stats = await cluster_manager.get_cluster_stats()
print(f"\n集群统计:")
print(f"总节点数: {stats['total_nodes']}")
print(f"健康节点: {stats['healthy_nodes']}")
print(f"区域数: {stats['regions']}")
print(f"数据中心数: {stats['datacenters']}")
print(f"总请求数: {stats['total_requests']}")
print(f"成功率: {stats['success_rate']:.2%}")
print(f"区域分布: {stats['region_distribution']}")
# 获取特定区域的健康节点
us_east_nodes = await cluster_manager.get_healthy_nodes(region='us-east-1')
print(f"\nUS-East-1区域健康节点: {len(us_east_nodes)}")
# 模拟状态更新
await asyncio.sleep(5)
await cluster_manager.update_node_status('proxy-node-00', {'status': 'degraded'})
# 运行一段时间
await asyncio.sleep(60)
finally:
await cluster_manager.shutdown()
if __name__ == "__main__":
asyncio.run(cluster_management_example())
🤖 智能调度算法
2.1 调度策略设计
智能调度系统基于机器学习算法,动态选择最优代理节点:
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
import time
import math
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
import pickle
import json
class SchedulingAlgorithm(Enum):
"""调度算法类型"""
ROUND_ROBIN = "round_robin"
WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
LEAST_CONNECTIONS = "least_connections"
LEAST_RESPONSE_TIME = "least_response_time"
GEOGRAPHIC_PROXIMITY = "geographic_proximity"
MACHINE_LEARNING = "machine_learning"
HYBRID = "hybrid"
@dataclass
class RequestContext:
"""请求上下文"""
request_id: str
client_ip: str
target_url: str
request_method: str
request_size: int
priority: int = 1 # 1-10, 10最高
geo_location: Dict[str, float] = None # {"lat": xxx, "lng": xxx}
user_agent: str = ""
expected_response_size: int = 0
timeout_seconds: int = 30
retry_count: int = 0
@dataclass
class SchedulingDecision:
"""调度决策"""
selected_node_id: str
confidence_score: float # 0-1
decision_time_ms: float
algorithm_used: str
fallback_nodes: List[str] = None
reasoning: str = ""
class IntelligentProxyScheduler:
"""智能代理调度器"""
def __init__(self, cluster_manager: ProxyClusterManager):
self.cluster_manager = cluster_manager
# 调度算法
self.algorithms = {
SchedulingAlgorithm.ROUND_ROBIN: self._round_robin_schedule,
SchedulingAlgorithm.WEIGHTED_ROUND_ROBIN: self._weighted_round_robin_schedule,
SchedulingAlgorithm.LEAST_CONNECTIONS: self._least_connections_schedule,
SchedulingAlgorithm.LEAST_RESPONSE_TIME: self._least_response_time_schedule,
SchedulingAlgorithm.GEOGRAPHIC_PROXIMITY: self._geographic_proximity_schedule,
SchedulingAlgorithm.MACHINE_LEARNING: self._ml_schedule,
SchedulingAlgorithm.HYBRID: self._hybrid_schedule
}
# 默认调度策略
self.default_algorithm = SchedulingAlgorithm.HYBRID
# 轮询计数器
self.round_robin_counter = 0
self.weighted_counters = {}
# 机器学习模型
self.ml_model = None
self.feature_scaler = StandardScaler()
self.ml_model_trained = False
# 调度历史
self.scheduling_history = []
self.max_history_size = 10000
# 性能统计
self.algorithm_performance = {}
# 地理位置缓存
self.geo_cache = {}
async def schedule_request(self,
request_context: RequestContext,
algorithm: SchedulingAlgorithm = None) -> SchedulingDecision:
"""调度请求到最优节点"""
start_time = time.time()
algorithm = algorithm or self.default_algorithm
try:
# 获取可用节点
available_nodes = await self.cluster_manager.get_healthy_nodes()
if not available_nodes:
raise Exception("没有可用的健康节点")
# 执行调度算法
scheduler_func = self.algorithms[algorithm]
decision = await scheduler_func(request_context, available_nodes)
# 记录调度时间
decision.decision_time_ms = (time.time() - start_time) * 1000
decision.algorithm_used = algorithm.value
# 添加到调度历史
self._add_to_history(request_context, decision)
# 更新算法性能统计
self._update_algorithm_performance(algorithm, decision)
return decision
except Exception as e:
# 降级处理 - 使用轮询
if algorithm != SchedulingAlgorithm.ROUND_ROBIN:
print(f"调度算法 {algorithm.value} 失败: {e},降级到轮询")
return await self.schedule_request(request_context, SchedulingAlgorithm.ROUND_ROBIN)
else:
raise Exception(f"调度失败: {e}")
async def _round_robin_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""轮询调度"""
if not available_nodes:
raise Exception("没有可用节点")
# 简单轮询选择
selected_node = available_nodes[self.round_robin_counter % len(available_nodes)]
self.round_robin_counter += 1
return SchedulingDecision(
selected_node_id=selected_node['node_id'],
confidence_score=1.0,
decision_time_ms=0.0,
algorithm_used="round_robin",
reasoning="轮询调度选择"
)
async def _weighted_round_robin_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""加权轮询调度"""
# 计算节点权重(基于性能指标)
weighted_nodes = []
for node in available_nodes:
metrics = node.get('metrics', {})
# 权重计算因子
success_rate = metrics.get('success_requests', 0) / max(metrics.get('total_requests', 1), 1)
avg_response_time = metrics.get('average_response_time', 1.0)
cpu_usage = metrics.get('cpu_usage_percent', 50.0)
# 权重 = 成功率 / (响应时间 * CPU使用率因子)
weight = success_rate / (avg_response_time * (1 + cpu_usage / 100))
weighted_nodes.append({
'node': node,
'weight': max(weight, 0.1) # 最小权重0.1
})
# 加权选择
total_weight = sum(item['weight'] for item in weighted_nodes)
# 使用权重进行轮询
target_weight = (time.time() * 1000) % total_weight
current_weight = 0
for item in weighted_nodes:
current_weight += item['weight']
if current_weight >= target_weight:
selected_node = item['node']
break
else:
selected_node = weighted_nodes[0]['node']
return SchedulingDecision(
selected_node_id=selected_node['node_id'],
confidence_score=0.8,
decision_time_ms=0.0,
algorithm_used="weighted_round_robin",
reasoning=f"基于权重选择,权重={item['weight']:.3f}"
)
async def _least_connections_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""最少连接调度"""
# 选择活跃连接数最少的节点
best_node = min(
available_nodes,
key=lambda node: node.get('metrics', {}).get('active_connections', 0)
)
connections = best_node.get('metrics', {}).get('active_connections', 0)
return SchedulingDecision(
selected_node_id=best_node['node_id'],
confidence_score=0.9,
decision_time_ms=0.0,
algorithm_used="least_connections",
reasoning=f"最少活跃连接数: {connections}"
)
async def _least_response_time_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""最短响应时间调度"""
# 选择平均响应时间最短的节点
best_node = min(
available_nodes,
key=lambda node: node.get('metrics', {}).get('average_response_time', float('inf'))
)
response_time = best_node.get('metrics', {}).get('average_response_time', 0)
return SchedulingDecision(
selected_node_id=best_node['node_id'],
confidence_score=0.85,
decision_time_ms=0.0,
algorithm_used="least_response_time",
reasoning=f"最短平均响应时间: {response_time:.3f}s"
)
async def _geographic_proximity_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""地理位置就近调度"""
if not request_context.geo_location:
# 如果没有地理位置信息,回退到最少连接调度
return await self._least_connections_schedule(request_context, available_nodes)
client_lat = request_context.geo_location['lat']
client_lng = request_context.geo_location['lng']
# 计算到各节点的距离
node_distances = []
for node in available_nodes:
region = node['config']['region']
# 获取区域的地理坐标(简化版本)
region_coords = self._get_region_coordinates(region)
if region_coords:
distance = self._calculate_distance(
client_lat, client_lng,
region_coords['lat'], region_coords['lng']
)
node_distances.append({
'node': node,
'distance': distance
})
else:
# 如果没有坐标信息,给一个默认距离
node_distances.append({
'node': node,
'distance': 10000 # 10000km
})
# 选择距离最近的节点
best_item = min(node_distances, key=lambda item: item['distance'])
return SchedulingDecision(
selected_node_id=best_item['node']['node_id'],
confidence_score=0.75,
decision_time_ms=0.0,
algorithm_used="geographic_proximity",
reasoning=f"地理距离: {best_item['distance']:.1f}km"
)
def _get_region_coordinates(self, region: str) -> Optional[Dict[str, float]]:
"""获取区域坐标(简化实现)"""
region_coords = {
'us-east-1': {'lat': 39.0458, 'lng': -76.6413}, # 弗吉尼亚
'us-west-1': {'lat': 37.7749, 'lng': -122.4194}, # 旧金山
'eu-west-1': {'lat': 53.4084, 'lng': -6.2917}, # 都柏林
'ap-southeast-1': {'lat': 1.3521, 'lng': 103.8198} # 新加坡
}
return region_coords.get(region)
def _calculate_distance(self, lat1: float, lng1: float,
lat2: float, lng2: float) -> float:
"""计算两点间距离(公里)"""
# 使用Haversine公式
R = 6371 # 地球半径(公里)
dlat = math.radians(lat2 - lat1)
dlng = math.radians(lng2 - lng1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlng/2) * math.sin(dlng/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
async def _ml_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""机器学习调度"""
if not self.ml_model_trained:
# 如果模型未训练,使用混合调度
return await self._hybrid_schedule(request_context, available_nodes)
try:
# 特征提取
features_list = []
for node in available_nodes:
features = self._extract_features(request_context, node)
features_list.append(features)
# 标准化特征
features_array = np.array(features_list)
features_scaled = self.feature_scaler.transform(features_array)
# 预测响应时间
predicted_times = self.ml_model.predict(features_scaled)
# 选择预测响应时间最短的节点
best_index = np.argmin(predicted_times)
selected_node = available_nodes[best_index]
# 计算置信度
confidence = 1.0 / (1.0 + predicted_times[best_index])
return SchedulingDecision(
selected_node_id=selected_node['node_id'],
confidence_score=confidence,
decision_time_ms=0.0,
algorithm_used="machine_learning",
reasoning=f"ML预测响应时间: {predicted_times[best_index]:.3f}s"
)
except Exception as e:
print(f"ML调度失败: {e}")
return await self._hybrid_schedule(request_context, available_nodes)
def _extract_features(self, request_context: RequestContext, node: Dict) -> List[float]:
"""提取ML特征"""
metrics = node.get('metrics', {})
config = node.get('config', {})
features = [
# 节点性能指标
metrics.get('active_connections', 0),
metrics.get('average_response_time', 0),
metrics.get('cpu_usage_percent', 0),
metrics.get('memory_usage_percent', 0),
metrics.get('success_requests', 0) / max(metrics.get('total_requests', 1), 1),
# 请求特征
request_context.request_size,
request_context.priority,
request_context.expected_response_size,
request_context.timeout_seconds,
request_context.retry_count,
# 节点配置
config.get('max_connections', 1000),
config.get('max_bandwidth_mbps', 100),
]
# 地理距离特征
if request_context.geo_location:
region_coords = self._get_region_coordinates(config.get('region', ''))
if region_coords:
distance = self._calculate_distance(
request_context.geo_location['lat'],
request_context.geo_location['lng'],
region_coords['lat'],
region_coords['lng']
)
features.append(distance)
else:
features.append(10000) # 默认距离
else:
features.append(10000)
return features
async def _hybrid_schedule(self,
request_context: RequestContext,
available_nodes: List[Dict]) -> SchedulingDecision:
"""混合调度算法"""
# 根据请求优先级和特征选择最佳算法
# 高优先级请求使用最短响应时间
if request_context.priority >= 8:
return await self._least_response_time_schedule(request_context, available_nodes)
# 有地理位置信息时优先考虑就近调度
if request_context.geo_location:
geo_decision = await self._geographic_proximity_schedule(request_context, available_nodes)
# 检查就近节点的负载情况
selected_node_id = geo_decision.selected_node_id
selected_node = next(
node for node in available_nodes
if node['node_id'] == selected_node_id
)
metrics = selected_node.get('metrics', {})
cpu_usage = metrics.get('cpu_usage_percent', 0)
active_connections = metrics.get('active_connections', 0)
max_connections = selected_node['config'].get('max_connections', 1000)
# 如果就近节点负载过高,使用最少连接调度
if cpu_usage > 80 or active_connections / max_connections > 0.8:
return await self._least_connections_schedule(request_context, available_nodes)
else:
return geo_decision
else:
# 没有地理位置信息,使用加权轮询
return await self._weighted_round_robin_schedule(request_context, available_nodes)
def _add_to_history(self, request_context: RequestContext, decision: SchedulingDecision):
"""添加调度历史"""
history_entry = {
'timestamp': time.time(),
'request_id': request_context.request_id,
'selected_node_id': decision.selected_node_id,
'algorithm': decision.algorithm_used,
'confidence': decision.confidence_score,
'decision_time_ms': decision.decision_time_ms
}
self.scheduling_history.append(history_entry)
# 限制历史记录大小
if len(self.scheduling_history) > self.max_history_size:
self.scheduling_history = self.scheduling_history[-self.max_history_size//2:]
def _update_algorithm_performance(self, algorithm: SchedulingAlgorithm, decision: SchedulingDecision):
"""更新算法性能统计"""
algo_name = algorithm.value
if algo_name not in self.algorithm_performance:
self.algorithm_performance[algo_name] = {
'total_decisions': 0,
'total_decision_time': 0.0,
'confidence_scores': []
}
stats = self.algorithm_performance[algo_name]
stats['total_decisions'] += 1
stats['total_decision_time'] += decision.decision_time_ms
stats['confidence_scores'].append(decision.confidence_score)
# 只保留最近1000次决策的统计
if len(stats['confidence_scores']) > 1000:
stats['confidence_scores'] = stats['confidence_scores'][-500:]
async def train_ml_model(self, training_data: List[Dict] = None):
"""训练机器学习模型"""
if training_data is None:
training_data = self._prepare_training_data_from_history()
if len(training_data) < 100:
print("训练数据不足,需要至少100个样本")
return False
try:
# 准备训练数据
X = []
y = []
for item in training_data:
features = item['features']
actual_response_time = item['actual_response_time']
X.append(features)
y.append(actual_response_time)
X = np.array(X)
y = np.array(y)
# 标准化特征
X_scaled = self.feature_scaler.fit_transform(X)
# 训练模型
self.ml_model = RandomForestRegressor(
n_estimators=100,
random_state=42,
max_depth=10
)
self.ml_model.fit(X_scaled, y)
# 评估模型
train_score = self.ml_model.score(X_scaled, y)
print(f"ML模型训练完成,R²得分: {train_score:.3f}")
self.ml_model_trained = True
return True
except Exception as e:
print(f"ML模型训练失败: {e}")
return False
def _prepare_training_data_from_history(self) -> List[Dict]:
"""从历史记录准备训练数据"""
# 这里应该从实际的请求-响应历史中提取数据
# 简化实现,返回模拟数据
return []
def get_scheduler_stats(self) -> Dict[str, Any]:
"""获取调度器统计"""
stats = {
'total_decisions': len(self.scheduling_history),
'algorithm_performance': {},
'ml_model_trained': self.ml_model_trained
}
# 计算各算法性能统计
for algo_name, perf_data in self.algorithm_performance.items():
avg_decision_time = (
perf_data['total_decision_time'] / max(perf_data['total_decisions'], 1)
)
avg_confidence = np.mean(perf_data['confidence_scores'])
stats['algorithm_performance'][algo_name] = {
'total_decisions': perf_data['total_decisions'],
'avg_decision_time_ms': avg_decision_time,
'avg_confidence_score': avg_confidence
}
return stats
# 智能调度示例
async def intelligent_scheduling_example():
"""智能调度示例"""
# 创建集群管理器和调度器
cluster_manager = ProxyClusterManager()
scheduler = IntelligentProxyScheduler(cluster_manager)
try:
await cluster_manager.start()
# 注册测试节点
test_nodes = [
{
'node_id': 'proxy-us-east-1',
'status': 'healthy',
'config': {
'region': 'us-east-1',
'datacenter': 'dc-01',
'max_connections': 1000,
'max_bandwidth_mbps': 100
},
'metrics': {
'active_connections': 50,
'total_requests': 1000,
'success_requests': 950,
'average_response_time': 0.15,
'cpu_usage_percent': 30.0,
'memory_usage_percent': 60.0
}
},
{
'node_id': 'proxy-us-west-1',
'status': 'healthy',
'config': {
'region': 'us-west-1',
'datacenter': 'dc-02',
'max_connections': 1000,
'max_bandwidth_mbps': 100
},
'metrics': {
'active_connections': 80,
'total_requests': 1200,
'success_requests': 1140,
'average_response_time': 0.12,
'cpu_usage_percent': 45.0,
'memory_usage_percent': 70.0
}
}
]
for node_info in test_nodes:
await cluster_manager.register_node(node_info)
# 测试不同调度算法
test_requests = [
RequestContext(
request_id='req-001',
client_ip='192.168.1.100',
target_url='https://api.example.com/data',
request_method='GET',
request_size=1024,
priority=5,
geo_location={'lat': 40.7128, 'lng': -74.0060} # 纽约
),
RequestContext(
request_id='req-002',
client_ip='10.0.0.50',
target_url='https://api.example.com/upload',
request_method='POST',
request_size=10240,
priority=8,
expected_response_size=500
)
]
algorithms_to_test = [
SchedulingAlgorithm.ROUND_ROBIN,
SchedulingAlgorithm.WEIGHTED_ROUND_ROBIN,
SchedulingAlgorithm.LEAST_CONNECTIONS,
SchedulingAlgorithm.LEAST_RESPONSE_TIME,
SchedulingAlgorithm.GEOGRAPHIC_PROXIMITY,
SchedulingAlgorithm.HYBRID
]
print("测试不同调度算法:")
print("="*60)
for algorithm in algorithms_to_test:
print(f"\n算法: {algorithm.value}")
for i, request in enumerate(test_requests):
decision = await scheduler.schedule_request(request, algorithm)
print(f" 请求{i+1}: {decision.selected_node_id}")
print(f" 置信度: {decision.confidence_score:.2f}")
print(f" 决策时间: {decision.decision_time_ms:.2f}ms")
print(f" 理由: {decision.reasoning}")
# 显示调度器统计
stats = scheduler.get_scheduler_stats()
print(f"\n调度器统计:")
print(f"总决策数: {stats['total_decisions']}")
print(f"算法性能:")
for algo_name, perf in stats['algorithm_performance'].items():
print(f" {algo_name}:")
print(f" 决策次数: {perf['total_decisions']}")
print(f" 平均决策时间: {perf['avg_decision_time_ms']:.2f}ms")
print(f" 平均置信度: {perf['avg_confidence_score']:.2f}")
finally:
await cluster_manager.shutdown()
if __name__ == "__main__":
asyncio.run(intelligent_scheduling_example())
⚡ 高性能负载均衡器
3.1 多层负载均衡架构
构建高性能、高可用的负载均衡系统,支持多种均衡策略和故障切换:
import asyncio
import aiohttp
from aiohttp import web
import time
import hashlib
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
import json
import logging
import random
import statistics
from collections import defaultdict, deque
import heapq
class LoadBalancingStrategy(Enum):
"""负载均衡策略"""
ROUND_ROBIN = "round_robin"
WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
LEAST_CONNECTIONS = "least_connections"
IP_HASH = "ip_hash"
URL_HASH = "url_hash"
CONSISTENT_HASH = "consistent_hash"
ADAPTIVE = "adaptive"
class HealthStatus(Enum):
"""健康状态"""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
MAINTENANCE = "maintenance"
@dataclass
class BackendServer:
"""后端服务器"""
server_id: str
host: str
port: int
weight: int = 1
max_connections: int = 1000
current_connections: int = 0
health_status: HealthStatus = HealthStatus.HEALTHY
response_times: deque = field(default_factory=lambda: deque(maxlen=100))
error_count: int = 0
last_health_check: float = 0.0
total_requests: int = 0
successful_requests: int = 0
@property
def avg_response_time(self) -> float:
"""平均响应时间"""
return statistics.mean(self.response_times) if self.response_times else 0.0
@property
def success_rate(self) -> float:
"""成功率"""
return self.successful_requests / max(self.total_requests, 1)
@property
def load_factor(self) -> float:
"""负载因子 (0-1)"""
return self.current_connections / self.max_connections
class HighPerformanceLoadBalancer:
"""高性能负载均衡器"""
def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.ADAPTIVE):
self.strategy = strategy
self.servers: Dict[str, BackendServer] = {}
# 轮询计数器
self.round_robin_index = 0
# 一致性哈希环
self.hash_ring = {}
self.ring_keys = []
# 连接池
self.connection_pools = {}
# 健康检查
self.health_checker_task = None
self.health_check_interval = 30
# 性能统计
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'avg_response_time': 0.0,
'requests_per_second': 0.0
}
# 请求历史(用于QPS计算)
self.request_history = deque(maxlen=1000)
# 熔断器
self.circuit_breakers = {}
# 限流器
self.rate_limiters = {}
async def add_server(self, server: BackendServer):
"""添加后端服务器"""
self.servers[server.server_id] = server
# 创建连接池
connector = aiohttp.TCPConnector(
limit=server.max_connections,
limit_per_host=server.max_connections,
ttl_dns_cache=300
)
self.connection_pools[server.server_id] = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30)
)
# 更新一致性哈希环
self._update_hash_ring()
# 初始化熔断器
self.circuit_breakers[server.server_id] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=Exception
)
print(f"添加服务器: {server.server_id} ({server.host}:{server.port})")
async def remove_server(self, server_id: str):
"""移除后端服务器"""
if server_id in self.servers:
# 关闭连接池
if server_id in self.connection_pools:
await self.connection_pools[server_id].close()
del self.connection_pools[server_id]
# 移除服务器
del self.servers[server_id]
# 移除熔断器
if server_id in self.circuit_breakers:
del self.circuit_breakers[server_id]
# 更新哈希环
self._update_hash_ring()
print(f"移除服务器: {server_id}")
async def handle_request(self, request_data: Dict) -> Dict[str, Any]:
"""处理负载均衡请求"""
start_time = time.time()
try:
# 选择后端服务器
selected_server = await self._select_server(request_data)
if not selected_server:
raise Exception("没有可用的后端服务器")
# 检查熔断器状态
circuit_breaker = self.circuit_breakers[selected_server.server_id]
if not circuit_breaker.can_execute():
raise Exception(f"服务器 {selected_server.server_id} 熔断中")
# 发送请求
result = await self._forward_request(selected_server, request_data)
# 记录成功
response_time = time.time() - start_time
self._record_success(selected_server, response_time)
circuit_breaker.record_success()
return result
except Exception as e:
# 记录失败
response_time = time.time() - start_time
self._record_failure(selected_server if 'selected_server' in locals() else None, response_time)
if 'selected_server' in locals():
self.circuit_breakers[selected_server.server_id].record_failure()
# 尝试故障切换
if 'selected_server' in locals():
fallback_result = await self._try_fallback(request_data, selected_server.server_id)
if fallback_result:
return fallback_result
raise Exception(f"请求处理失败: {e}")
async def _select_server(self, request_data: Dict) -> Optional[BackendServer]:
"""选择后端服务器"""
healthy_servers = [
server for server in self.servers.values()
if server.health_status in [HealthStatus.HEALTHY, HealthStatus.DEGRADED]
]
if not healthy_servers:
return None
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
return self._round_robin_select(healthy_servers)
elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
return self._weighted_round_robin_select(healthy_servers)
elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
return self._least_connections_select(healthy_servers)
elif self.strategy == LoadBalancingStrategy.IP_HASH:
return self._ip_hash_select(healthy_servers, request_data.get('client_ip', ''))
elif self.strategy == LoadBalancingStrategy.URL_HASH:
return self._url_hash_select(healthy_servers, request_data.get('url', ''))
elif self.strategy == LoadBalancingStrategy.CONSISTENT_HASH:
return self._consistent_hash_select(request_data.get('client_ip', ''))
elif self.strategy == LoadBalancingStrategy.ADAPTIVE:
return self._adaptive_select(healthy_servers, request_data)
else:
return self._round_robin_select(healthy_servers)
def _round_robin_select(self, servers: List[BackendServer]) -> BackendServer:
"""轮询选择"""
server = servers[self.round_robin_index % len(servers)]
self.round_robin_index += 1
return server
def _weighted_round_robin_select(self, servers: List[BackendServer]) -> BackendServer:
"""加权轮询选择"""
total_weight = sum(server.weight for server in servers)
# 使用时间戳作为随机种子,保证分布的均匀性
target = (int(time.time() * 1000) % total_weight)
current = 0
for server in servers:
current += server.weight
if current > target:
return server
return servers[0]
def _least_connections_select(self, servers: List[BackendServer]) -> BackendServer:
"""最少连接选择"""
return min(servers, key=lambda s: s.current_connections / s.weight)
def _ip_hash_select(self, servers: List[BackendServer], client_ip: str) -> BackendServer:
"""IP哈希选择"""
hash_value = int(hashlib.md5(client_ip.encode()).hexdigest(), 16)
return servers[hash_value % len(servers)]
def _url_hash_select(self, servers: List[BackendServer], url: str) -> BackendServer:
"""URL哈希选择"""
hash_value = int(hashlib.md5(url.encode()).hexdigest(), 16)
return servers[hash_value % len(servers)]
def _consistent_hash_select(self, key: str) -> Optional[BackendServer]:
"""一致性哈希选择"""
if not self.ring_keys:
return None
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
# 找到第一个大于等于hash值的节点
for ring_key in self.ring_keys:
if ring_key >= hash_value:
server_id = self.hash_ring[ring_key]
return self.servers.get(server_id)
# 如果没找到,返回第一个节点(环形)
server_id = self.hash_ring[self.ring_keys[0]]
return self.servers.get(server_id)
def _adaptive_select(self, servers: List[BackendServer], request_data: Dict) -> BackendServer:
"""自适应选择"""
# 综合考虑响应时间、连接数、成功率等因素
scored_servers = []
for server in servers:
# 计算综合分数(分数越低越好)
response_time_factor = server.avg_response_time
connection_factor = server.load_factor
error_factor = 1.0 - server.success_rate
# 权重配置
score = (
response_time_factor * 0.4 +
connection_factor * 0.3 +
error_factor * 0.3
) / server.weight
scored_servers.append((score, server))
# 选择分数最低的服务器
scored_servers.sort(key=lambda x: x[0])
return scored_servers[0][1]
def _update_hash_ring(self):
"""更新一致性哈希环"""
self.hash_ring = {}
self.ring_keys = []
# 为每个服务器创建多个虚拟节点
virtual_nodes = 100
for server_id, server in self.servers.items():
if server.health_status == HealthStatus.HEALTHY:
for i in range(virtual_nodes):
virtual_key = f"{server_id}:{i}"
hash_value = int(hashlib.md5(virtual_key.encode()).hexdigest(), 16)
self.hash_ring[hash_value] = server_id
self.ring_keys.append(hash_value)
self.ring_keys.sort()
async def _forward_request(self, server: BackendServer, request_data: Dict) -> Dict[str, Any]:
"""转发请求到后端服务器"""
session = self.connection_pools[server.server_id]
# 增加连接计数
server.current_connections += 1
server.total_requests += 1
try:
url = f"http://{server.host}:{server.port}{request_data.get('path', '/')}"
method = request_data.get('method', 'GET')
headers = request_data.get('headers', {})
data = request_data.get('data')
# 添加负载均衡头
headers['X-Forwarded-For'] = request_data.get('client_ip', '')
headers['X-Load-Balancer'] = 'HighPerformanceLB/1.0'
headers['X-Backend-Server'] = server.server_id
async with session.request(
method=method,
url=url,
headers=headers,
data=data
) as response:
content = await response.read()
# 记录成功请求
server.successful_requests += 1
return {
'status_code': response.status,
'headers': dict(response.headers),
'content': content.decode('utf-8', errors='ignore'),
'server_id': server.server_id,
'server_host': f"{server.host}:{server.port}"
}
finally:
# 减少连接计数
server.current_connections -= 1
async def _try_fallback(self, request_data: Dict, failed_server_id: str) -> Optional[Dict[str, Any]]:
"""尝试故障切换"""
# 获取可用的其他服务器
available_servers = [
server for server in self.servers.values()
if (server.server_id != failed_server_id and
server.health_status == HealthStatus.HEALTHY)
]
if not available_servers:
return None
# 选择最佳备用服务器
backup_server = min(available_servers, key=lambda s: s.current_connections)
try:
return await self._forward_request(backup_server, request_data)
except Exception as e:
print(f"故障切换也失败: {e}")
return None
def _record_success(self, server: BackendServer, response_time: float):
"""记录成功请求"""
server.response_times.append(response_time)
server.error_count = max(0, server.error_count - 1) # 减少错误计数
# 更新全局统计
self.stats['total_requests'] += 1
self.stats['successful_requests'] += 1
# 记录请求时间(用于QPS计算)
self.request_history.append(time.time())
def _record_failure(self, server: Optional[BackendServer], response_time: float):
"""记录失败请求"""
if server:
server.error_count += 1
server.response_times.append(response_time)
# 更新全局统计
self.stats['total_requests'] += 1
self.stats['failed_requests'] += 1
# 记录请求时间
self.request_history.append(time.time())
async def start_health_checker(self):
"""启动健康检查"""
self.health_checker_task = asyncio.create_task(self._health_check_loop())
async def _health_check_loop(self):
"""健康检查循环"""
while True:
try:
await asyncio.sleep(self.health_check_interval)
# 并发检查所有服务器
health_check_tasks = [
self._check_server_health(server)
for server in self.servers.values()
]
await asyncio.gather(*health_check_tasks, return_exceptions=True)
except Exception as e:
print(f"健康检查循环错误: {e}")
async def _check_server_health(self, server: BackendServer):
"""检查单个服务器健康状态"""
try:
session = self.connection_pools[server.server_id]
health_url = f"http://{server.host}:{server.port}/health"
start_time = time.time()
async with session.get(health_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response_time = time.time() - start_time
if response.status == 200:
# 根据响应时间和错误率判断健康状态
if response_time < 1.0 and server.success_rate > 0.95:
server.health_status = HealthStatus.HEALTHY
elif response_time < 3.0 and server.success_rate > 0.8:
server.health_status = HealthStatus.DEGRADED
else:
server.health_status = HealthStatus.UNHEALTHY
else:
server.health_status = HealthStatus.UNHEALTHY
server.last_health_check = time.time()
except Exception as e:
print(f"健康检查失败 {server.server_id}: {e}")
server.health_status = HealthStatus.UNHEALTHY
server.last_health_check = time.time()
def get_stats(self) -> Dict[str, Any]:
"""获取负载均衡器统计"""
# 计算QPS
current_time = time.time()
recent_requests = [
req_time for req_time in self.request_history
if current_time - req_time <= 60 # 最近1分钟
]
qps = len(recent_requests) / 60.0
# 计算平均响应时间
all_response_times = []
for server in self.servers.values():
all_response_times.extend(server.response_times)
avg_response_time = statistics.mean(all_response_times) if all_response_times else 0.0
return {
'strategy': self.strategy.value,
'total_servers': len(self.servers),
'healthy_servers': len([s for s in self.servers.values() if s.health_status == HealthStatus.HEALTHY]),
'degraded_servers': len([s for s in self.servers.values() if s.health_status == HealthStatus.DEGRADED]),
'unhealthy_servers': len([s for s in self.servers.values() if s.health_status == HealthStatus.UNHEALTHY]),
'total_requests': self.stats['total_requests'],
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': self.stats['successful_requests'] / max(self.stats['total_requests'], 1),
'requests_per_second': qps,
'avg_response_time': avg_response_time,
'server_details': {
server_id: {
'host': f"{server.host}:{server.port}",
'health_status': server.health_status.value,
'current_connections': server.current_connections,
'total_requests': server.total_requests,
'success_rate': server.success_rate,
'avg_response_time': server.avg_response_time,
'load_factor': server.load_factor
}
for server_id, server in self.servers.items()
}
}
async def shutdown(self):
"""关闭负载均衡器"""
try:
# 停止健康检查
if self.health_checker_task:
self.health_checker_task.cancel()
# 关闭所有连接池
for session in self.connection_pools.values():
await session.close()
print("负载均衡器已关闭")
except Exception as e:
print(f"关闭负载均衡器失败: {e}")
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""检查是否可以执行"""
if self.state == 'CLOSED':
return True
elif self.state == 'OPEN':
# 检查是否可以转为半开状态
if (time.time() - self.last_failure_time) >= self.recovery_timeout:
self.state = 'HALF_OPEN'
return True
return False
elif self.state == 'HALF_OPEN':
return True
def record_success(self):
"""记录成功"""
self.failure_count = 0
self.state = 'CLOSED'
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
# HTTP服务器接口
class LoadBalancerHTTPServer:
"""负载均衡器HTTP服务"""
def __init__(self, load_balancer: HighPerformanceLoadBalancer, port: int = 8080):
self.load_balancer = load_balancer
self.port = port
self.app = web.Application()
# 设置路由
self.app.router.add_route('*', '/{path:.*}', self.handle_proxy_request)
self.app.router.add_get('/lb-stats', self.handle_stats)
self.app.router.add_get('/lb-health', self.handle_health)
async def handle_proxy_request(self, request: web.Request) -> web.Response:
"""处理代理请求"""
try:
# 构建请求数据
request_data = {
'method': request.method,
'path': request.path_qs,
'headers': dict(request.headers),
'client_ip': request.remote,
'url': str(request.url),
'data': await request.read() if request.method in ['POST', 'PUT', 'PATCH'] else None
}
# 通过负载均衡器处理
result = await self.load_balancer.handle_request(request_data)
# 构建响应
response = web.Response(
text=result['content'],
status=result['status_code'],
headers=result['headers']
)
# 添加负载均衡信息头
response.headers['X-Backend-Server'] = result['server_id']
response.headers['X-Server-Host'] = result['server_host']
return response
except Exception as e:
return web.Response(
text=f"Load balancer error: {str(e)}",
status=503,
headers={'Content-Type': 'text/plain'}
)
async def handle_stats(self, request: web.Request) -> web.Response:
"""处理统计请求"""
stats = self.load_balancer.get_stats()
return web.json_response(stats)
async def handle_health(self, request: web.Request) -> web.Response:
"""处理健康检查请求"""
stats = self.load_balancer.get_stats()
if stats['healthy_servers'] > 0:
return web.json_response({'status': 'healthy', 'healthy_servers': stats['healthy_servers']})
else:
return web.json_response({'status': 'unhealthy', 'healthy_servers': 0}, status=503)
async def start(self):
"""启动HTTP服务器"""
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', self.port)
await site.start()
print(f"负载均衡器HTTP服务启动在端口 {self.port}")
# 负载均衡示例
async def load_balancer_example():
"""负载均衡器示例"""
# 创建负载均衡器
lb = HighPerformanceLoadBalancer(LoadBalancingStrategy.ADAPTIVE)
# 添加后端服务器
servers = [
BackendServer('server-01', '127.0.0.1', 8001, weight=3),
BackendServer('server-02', '127.0.0.1', 8002, weight=2),
BackendServer('server-03', '127.0.0.1', 8003, weight=1),
]
for server in servers:
await lb.add_server(server)
# 启动健康检查
await lb.start_health_checker()
# 创建HTTP服务器
http_server = LoadBalancerHTTPServer(lb, 8080)
await http_server.start()
try:
# 模拟一些请求
test_requests = [
{
'method': 'GET',
'path': '/api/data',
'headers': {'User-Agent': 'TestClient'},
'client_ip': f'192.168.1.{100+i}',
'url': '/api/data'
}
for i in range(10)
]
# 并发发送请求
tasks = [lb.handle_request(req) for req in test_requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
successful = sum(1 for r in results if not isinstance(r, Exception))
print(f"成功处理 {successful}/{len(results)} 个请求")
# 显示负载均衡器统计
stats = lb.get_stats()
print(f"\n负载均衡器统计:")
print(f"策略: {stats['strategy']}")
print(f"总服务器: {stats['total_servers']}")
print(f"健康服务器: {stats['healthy_servers']}")
print(f"总请求: {stats['total_requests']}")
print(f"成功率: {stats['success_rate']:.2%}")
print(f"QPS: {stats['requests_per_second']:.2f}")
print(f"平均响应时间: {stats['avg_response_time']:.3f}s")
print(f"\n服务器详情:")
for server_id, details in stats['server_details'].items():
print(f" {server_id}: {details['health_status']}, "
f"连接数: {details['current_connections']}, "
f"成功率: {details['success_rate']:.2%}")
# 保持运行
print("\n负载均衡器运行中,按Ctrl+C停止...")
while True:
await asyncio.sleep(10)
current_stats = lb.get_stats()
print(f"QPS: {current_stats['requests_per_second']:.2f}, "
f"成功率: {current_stats['success_rate']:.2%}")
except KeyboardInterrupt:
print("\n停止负载均衡器...")
finally:
await lb.shutdown()
if __name__ == "__main__":
asyncio.run(load_balancer_example())
🚀 性能优化与扩展
4.1 连接池优化
实现高效的连接池管理,支持动态调整和智能回收:
import asyncio
import aiohttp
import time
import weakref
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from collections import deque
import threading
import gc
@dataclass
class ConnectionPoolStats:
"""连接池统计"""
total_connections: int = 0
active_connections: int = 0
idle_connections: int = 0
failed_connections: int = 0
created_connections: int = 0
closed_connections: int = 0
pool_hits: int = 0
pool_misses: int = 0
class OptimizedConnectionPool:
"""优化的连接池"""
def __init__(self,
max_size: int = 100,
min_size: int = 10,
max_idle_time: int = 300,
cleanup_interval: int = 60):
self.max_size = max_size
self.min_size = min_size
self.max_idle_time = max_idle_time
self.cleanup_interval = cleanup_interval
# 连接池
self._pool: deque = deque()
self._active_connections: Set = set()
self._connection_created_times: Dict = {}
# 统计信息
self.stats = ConnectionPoolStats()
# 锁和清理任务
self._lock = asyncio.Lock()
self._cleanup_task = None
self._closed = False
# 连接创建函数
self._connection_factory = None
async def start(self, connection_factory):
"""启动连接池"""
self._connection_factory = connection_factory
# 预创建最小数量的连接
for _ in range(self.min_size):
conn = await self._create_connection()
if conn:
self._pool.append((conn, time.time()))
# 启动清理任务
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def get_connection(self):
"""获取连接"""
async with self._lock:
if self._closed:
raise Exception("连接池已关闭")
# 尝试从池中获取连接
while self._pool:
conn, created_time = self._pool.popleft()
# 检查连接是否仍然有效
if await self._is_connection_valid(conn, created_time):
self._active_connections.add(conn)
self.stats.pool_hits += 1
self.stats.active_connections = len(self._active_connections)
self.stats.idle_connections = len(self._pool)
return conn
else:
# 连接无效,关闭并计数
await self._close_connection(conn)
# 池中没有可用连接,创建新连接
if len(self._active_connections) < self.max_size:
conn = await self._create_connection()
if conn:
self._active_connections.add(conn)
self.stats.pool_misses += 1
self.stats.active_connections = len(self._active_connections)
return conn
# 无法创建新连接
raise Exception("连接池已满且无法创建新连接")
async def return_connection(self, conn):
"""归还连接"""
async with self._lock:
if conn in self._active_connections:
self._active_connections.remove(conn)
# 检查连接是否仍然有效
created_time = self._connection_created_times.get(id(conn), time.time())
if (not self._closed and
await self._is_connection_valid(conn, created_time) and
len(self._pool) < self.max_size):
# 连接有效,放回池中
self._pool.append((conn, time.time()))
else:
# 连接无效或池已满,关闭连接
await self._close_connection(conn)
self.stats.active_connections = len(self._active_connections)
self.stats.idle_connections = len(self._pool)
async def _create_connection(self):
"""创建新连接"""
try:
conn = await self._connection_factory()
conn_id = id(conn)
self._connection_created_times[conn_id] = time.time()
self.stats.created_connections += 1
self.stats.total_connections += 1
return conn
except Exception as e:
self.stats.failed_connections += 1
print(f"创建连接失败: {e}")
return None
async def _close_connection(self, conn):
"""关闭连接"""
try:
if hasattr(conn, 'close'):
if asyncio.iscoroutinefunction(conn.close):
await conn.close()
else:
conn.close()
# 清理连接创建时间记录
conn_id = id(conn)
self._connection_created_times.pop(conn_id, None)
self.stats.closed_connections += 1
self.stats.total_connections = max(0, self.stats.total_connections - 1)
except Exception as e:
print(f"关闭连接失败: {e}")
async def _is_connection_valid(self, conn, created_time: float) -> bool:
"""检查连接是否有效"""
# 检查连接年龄
if time.time() - created_time > self.max_idle_time:
return False
# 检查连接状态(简化实现)
if hasattr(conn, 'closed'):
return not conn.closed
return True
async def _cleanup_loop(self):
"""清理循环"""
while not self._closed:
try:
await asyncio.sleep(self.cleanup_interval)
await self._cleanup_idle_connections()
except Exception as e:
print(f"连接池清理错误: {e}")
async def _cleanup_idle_connections(self):
"""清理空闲连接"""
async with self._lock:
current_time = time.time()
connections_to_remove = []
# 检查空闲连接
for conn, idle_time in list(self._pool):
if (current_time - idle_time > self.max_idle_time or
not await self._is_connection_valid(conn,
self._connection_created_times.get(id(conn), idle_time))):
connections_to_remove.append((conn, idle_time))
# 移除过期连接
for conn, idle_time in connections_to_remove:
try:
self._pool.remove((conn, idle_time))
await self._close_connection(conn)
except ValueError:
pass # 连接可能已被移除
# 确保最小连接数
while (len(self._pool) + len(self._active_connections) < self.min_size and
len(self._pool) + len(self._active_connections) < self.max_size):
conn = await self._create_connection()
if conn:
self._pool.append((conn, time.time()))
else:
break
self.stats.idle_connections = len(self._pool)
def get_stats(self) -> ConnectionPoolStats:
"""获取连接池统计"""
self.stats.total_connections = len(self._pool) + len(self._active_connections)
self.stats.active_connections = len(self._active_connections)
self.stats.idle_connections = len(self._pool)
return self.stats
async def close(self):
"""关闭连接池"""
self._closed = True
# 停止清理任务
if self._cleanup_task:
self._cleanup_task.cancel()
# 关闭所有连接
async with self._lock:
# 关闭空闲连接
while self._pool:
conn, _ = self._pool.popleft()
await self._close_connection(conn)
# 关闭活跃连接
for conn in list(self._active_connections):
await self._close_connection(conn)
self._active_connections.clear()
class PerformanceOptimizedProxyCluster:
"""性能优化的代理集群"""
def __init__(self):
self.connection_pools: Dict[str, OptimizedConnectionPool] = {}
self.request_cache = {}
self.response_cache = {}
# 性能指标
self.metrics = {
'total_requests': 0,
'cache_hits': 0,
'cache_misses': 0,
'avg_response_time': 0.0,
'connection_pool_efficiency': 0.0
}
# 缓存配置
self.enable_request_cache = True
self.enable_response_cache = True
self.cache_ttl = 300 # 5分钟
# 性能监控
self.performance_monitor = None
async def create_optimized_pool(self, server_id: str, server_config: Dict) -> OptimizedConnectionPool:
"""为服务器创建优化的连接池"""
async def connection_factory():
connector = aiohttp.TCPConnector(
limit=server_config.get('max_connections', 100),
limit_per_host=server_config.get('max_connections', 100),
ttl_dns_cache=300,
use_dns_cache=True,
enable_cleanup_closed=True,
keepalive_timeout=30
)
return aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=server_config.get('timeout', 30),
connect=10,
sock_read=20
)
)
pool = OptimizedConnectionPool(
max_size=server_config.get('max_connections', 100),
min_size=server_config.get('min_connections', 10),
max_idle_time=server_config.get('max_idle_time', 300)
)
await pool.start(connection_factory)
self.connection_pools[server_id] = pool
return pool
async def handle_optimized_request(self, request_data: Dict) -> Dict[str, Any]:
"""处理优化的请求"""
start_time = time.time()
request_id = f"{request_data.get('method', 'GET')}:{request_data.get('url', '')}"
# 检查请求缓存
if self.enable_request_cache and request_id in self.request_cache:
cache_entry = self.request_cache[request_id]
if time.time() - cache_entry['timestamp'] < self.cache_ttl:
self.metrics['cache_hits'] += 1
return cache_entry['response']
self.metrics['cache_misses'] += 1
self.metrics['total_requests'] += 1
try:
# 选择最优服务器(简化实现)
server_id = self._select_optimal_server(request_data)
if server_id not in self.connection_pools:
raise Exception(f"服务器 {server_id} 的连接池不存在")
pool = self.connection_pools[server_id]
# 从连接池获取连接
session = await pool.get_connection()
try:
# 发送请求
response = await self._send_request(session, request_data)
# 更新缓存
if self.enable_response_cache:
self.request_cache[request_id] = {
'response': response,
'timestamp': time.time()
}
# 更新性能指标
response_time = time.time() - start_time
self._update_performance_metrics(response_time)
return response
finally:
# 归还连接到池
await pool.return_connection(session)
except Exception as e:
response_time = time.time() - start_time
self._update_performance_metrics(response_time)
raise e
def _select_optimal_server(self, request_data: Dict) -> str:
"""选择最优服务器(简化实现)"""
# 这里应该集成智能调度算法
# 简化实现:返回第一个可用的服务器
if self.connection_pools:
return next(iter(self.connection_pools.keys()))
else:
raise Exception("没有可用的服务器")
async def _send_request(self, session: aiohttp.ClientSession, request_data: Dict) -> Dict[str, Any]:
"""发送HTTP请求"""
method = request_data.get('method', 'GET')
url = request_data.get('url', '')
headers = request_data.get('headers', {})
data = request_data.get('data')
async with session.request(method, url, headers=headers, data=data) as response:
content = await response.read()
return {
'status_code': response.status,
'headers': dict(response.headers),
'content': content.decode('utf-8', errors='ignore'),
'response_time': response.headers.get('X-Response-Time', '0')
}
def _update_performance_metrics(self, response_time: float):
"""更新性能指标"""
# 计算移动平均响应时间
alpha = 0.1 # 平滑因子
if self.metrics['avg_response_time'] == 0:
self.metrics['avg_response_time'] = response_time
else:
self.metrics['avg_response_time'] = (
alpha * response_time +
(1 - alpha) * self.metrics['avg_response_time']
)
# 计算连接池效率
total_pool_efficiency = 0
for pool in self.connection_pools.values():
stats = pool.get_stats()
if stats.pool_hits + stats.pool_misses > 0:
efficiency = stats.pool_hits / (stats.pool_hits + stats.pool_misses)
total_pool_efficiency += efficiency
if self.connection_pools:
self.metrics['connection_pool_efficiency'] = total_pool_efficiency / len(self.connection_pools)
def get_performance_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
cache_hit_rate = 0
if self.metrics['cache_hits'] + self.metrics['cache_misses'] > 0:
cache_hit_rate = self.metrics['cache_hits'] / (self.metrics['cache_hits'] + self.metrics['cache_misses'])
pool_stats = {}
for server_id, pool in self.connection_pools.items():
pool_stats[server_id] = pool.get_stats()
return {
'request_metrics': {
'total_requests': self.metrics['total_requests'],
'cache_hit_rate': cache_hit_rate,
'avg_response_time': self.metrics['avg_response_time'],
'connection_pool_efficiency': self.metrics['connection_pool_efficiency']
},
'pool_stats': pool_stats,
'cache_stats': {
'request_cache_size': len(self.request_cache),
'response_cache_enabled': self.enable_response_cache,
'cache_ttl': self.cache_ttl
}
}
async def shutdown(self):
"""关闭集群"""
for pool in self.connection_pools.values():
await pool.close()
self.connection_pools.clear()
# 性能优化示例
async def performance_optimization_example():
"""性能优化示例"""
cluster = PerformanceOptimizedProxyCluster()
try:
# 创建优化的连接池
server_configs = {
'server-01': {
'max_connections': 50,
'min_connections': 10,
'max_idle_time': 300,
'timeout': 30
},
'server-02': {
'max_connections': 30,
'min_connections': 5,
'max_idle_time': 300,
'timeout': 30
}
}
for server_id, config in server_configs.items():
await cluster.create_optimized_pool(server_id, config)
print("性能优化集群初始化完成")
# 模拟并发请求
test_requests = [
{
'method': 'GET',
'url': f'https://httpbin.org/delay/{i%3}', # 0-2秒延迟
'headers': {'User-Agent': 'PerformanceTest'}
}
for i in range(100)
]
print("开始性能测试...")
start_time = time.time()
# 并发执行请求
tasks = []
batch_size = 10
for i in range(0, len(test_requests), batch_size):
batch = test_requests[i:i + batch_size]
batch_tasks = [cluster.handle_optimized_request(req) for req in batch]
# 执行批次
results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# 统计结果
successful = sum(1 for r in results if not isinstance(r, Exception))
print(f"批次 {i//batch_size + 1}: {successful}/{len(batch)} 成功")
# 短暂休息避免压力过大
await asyncio.sleep(0.1)
total_time = time.time() - start_time
# 显示性能统计
stats = cluster.get_performance_stats()
print(f"\n性能测试结果:")
print(f"总耗时: {total_time:.2f} 秒")
print(f"总请求数: {stats['request_metrics']['total_requests']}")
print(f"缓存命中率: {stats['request_metrics']['cache_hit_rate']:.2%}")
print(f"平均响应时间: {stats['request_metrics']['avg_response_time']:.3f} 秒")
print(f"连接池效率: {stats['request_metrics']['connection_pool_efficiency']:.2%}")
print(f"\n连接池统计:")
for server_id, pool_stat in stats['pool_stats'].items():
print(f" {server_id}:")
print(f" 总连接数: {pool_stat.total_connections}")
print(f" 活跃连接: {pool_stat.active_connections}")
print(f" 空闲连接: {pool_stat.idle_connections}")
print(f" 池命中数: {pool_stat.pool_hits}")
print(f" 池错失数: {pool_stat.pool_misses}")
# 运行一段时间观察性能
print("\n继续运行以观察性能...")
await asyncio.sleep(30)
finally:
await cluster.shutdown()
print("性能优化集群已关闭")
if __name__ == "__main__":
asyncio.run(performance_optimization_example())
🌍 全球分布式部署
5.1 多地域集群架构
构建跨地域的高可用代理集群,支持就近访问和故障切换:
import asyncio
import aiohttp
import json
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from enum import Enum
import time
import math
import random
from datetime import datetime, timedelta
class RegionStatus(Enum):
"""区域状态"""
ACTIVE = "active"
DEGRADED = "degraded"
MAINTENANCE = "maintenance"
OFFLINE = "offline"
@dataclass
class GeographicRegion:
"""地理区域"""
region_id: str
region_name: str
country_code: str
latitude: float
longitude: float
timezone: str
status: RegionStatus = RegionStatus.ACTIVE
# 网络配置
primary_datacenter: str = ""
backup_datacenters: List[str] = field(default_factory=list)
# 容量配置
max_capacity: int = 1000
current_load: int = 0
# 网络延迟(毫秒)
network_latency: Dict[str, float] = field(default_factory=dict)
@dataclass
class DataCenter:
"""数据中心"""
datacenter_id: str
datacenter_name: str
region_id: str
address: str
# 服务器列表
proxy_servers: List[str] = field(default_factory=list)
management_servers: List[str] = field(default_factory=list)
# 容量和状态
max_servers: int = 100
current_servers: int = 0
status: RegionStatus = RegionStatus.ACTIVE
# 性能指标
avg_response_time: float = 0.0
success_rate: float = 1.0
bandwidth_utilization: float = 0.0
class GlobalProxyClusterManager:
"""全球代理集群管理器"""
def __init__(self):
self.regions: Dict[str, GeographicRegion] = {}
self.datacenters: Dict[str, DataCenter] = {}
# 路由表和延迟矩阵
self.routing_table: Dict[str, List[str]] = {}
self.latency_matrix: Dict[Tuple[str, str], float] = {}
# 全局负载均衡器
self.global_load_balancer = None
# 故障检测和恢复
self.failure_detector = GlobalFailureDetector()
self.disaster_recovery = DisasterRecoveryManager()
# 数据同步
self.data_synchronizer = GlobalDataSynchronizer()
# 监控和告警
self.global_monitor = GlobalClusterMonitor()
async def initialize_global_cluster(self):
"""初始化全球集群"""
# 定义地理区域
regions_config = [
{
'region_id': 'us-east-1',
'region_name': 'US East (Virginia)',
'country_code': 'US',
'latitude': 39.0458,
'longitude': -76.6413,
'timezone': 'America/New_York',
'primary_datacenter': 'us-east-1a',
'backup_datacenters': ['us-east-1b', 'us-east-1c']
},
{
'region_id': 'us-west-1',
'region_name': 'US West (California)',
'country_code': 'US',
'latitude': 37.7749,
'longitude': -122.4194,
'timezone': 'America/Los_Angeles',
'primary_datacenter': 'us-west-1a',
'backup_datacenters': ['us-west-1b']
},
{
'region_id': 'eu-west-1',
'region_name': 'Europe (Ireland)',
'country_code': 'IE',
'latitude': 53.4084,
'longitude': -6.2917,
'timezone': 'Europe/Dublin',
'primary_datacenter': 'eu-west-1a',
'backup_datacenters': ['eu-west-1b', 'eu-west-1c']
},
{
'region_id': 'ap-southeast-1',
'region_name': 'Asia Pacific (Singapore)',
'country_code': 'SG',
'latitude': 1.3521,
'longitude': 103.8198,
'timezone': 'Asia/Singapore',
'primary_datacenter': 'ap-southeast-1a',
'backup_datacenters': ['ap-southeast-1b']
},
{
'region_id': 'ap-northeast-1',
'region_name': 'Asia Pacific (Tokyo)',
'country_code': 'JP',
'latitude': 35.6762,
'longitude': 139.6503,
'timezone': 'Asia/Tokyo',
'primary_datacenter': 'ap-northeast-1a',
'backup_datacenters': ['ap-northeast-1b']
}
]
# 初始化区域
for region_config in regions_config:
region = GeographicRegion(**region_config)
self.regions[region.region_id] = region
# 创建数据中心
for dc_suffix in [region.primary_datacenter] + region.backup_datacenters:
dc = DataCenter(
datacenter_id=dc_suffix,
datacenter_name=f"{region.region_name} - {dc_suffix}",
region_id=region.region_id,
address=f"datacenter.{dc_suffix}.example.com"
)
self.datacenters[dc_suffix] = dc
# 计算区域间延迟矩阵
await self._calculate_latency_matrix()
# 构建路由表
await self._build_routing_table()
# 启动组件
await self.failure_detector.start()
await self.disaster_recovery.start()
await self.data_synchronizer.start()
await self.global_monitor.start()
print("全球代理集群初始化完成")
def find_nearest_region(self, client_latitude: float, client_longitude: float) -> Optional[GeographicRegion]:
"""找到最近的区域"""
min_distance = float('inf')
nearest_region = None
for region in self.regions.values():
if region.status != RegionStatus.ACTIVE:
continue
# 计算地理距离
distance = self._calculate_distance(
client_latitude, client_longitude,
region.latitude, region.longitude
)
# 考虑网络延迟和负载
load_factor = region.current_load / region.max_capacity
adjusted_distance = distance * (1 + load_factor)
if adjusted_distance < min_distance:
min_distance = adjusted_distance
nearest_region = region
return nearest_region
def _calculate_distance(self, lat1: float, lng1: float, lat2: float, lng2: float) -> float:
"""计算两点间距离(公里)"""
R = 6371 # 地球半径
dlat = math.radians(lat2 - lat1)
dlng = math.radians(lng2 - lng1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlng/2) * math.sin(dlng/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
async def _calculate_latency_matrix(self):
"""计算区域间延迟矩阵"""
print("计算区域间网络延迟...")
for region1_id, region1 in self.regions.items():
for region2_id, region2 in self.regions.items():
if region1_id == region2_id:
latency = 0.0
else:
# 基于地理距离估算延迟
distance = self._calculate_distance(
region1.latitude, region1.longitude,
region2.latitude, region2.longitude
)
# 简化公式:延迟 ≈ 距离/200 + 基础延迟
latency = distance / 200.0 + random.uniform(10, 30)
self.latency_matrix[(region1_id, region2_id)] = latency
region1.network_latency[region2_id] = latency
async def _build_routing_table(self):
"""构建路由表"""
print("构建全球路由表...")
for region_id in self.regions.keys():
# 为每个区域构建按延迟排序的路由列表
other_regions = [r for r in self.regions.keys() if r != region_id]
# 按延迟排序
sorted_regions = sorted(
other_regions,
key=lambda r: self.latency_matrix.get((region_id, r), float('inf'))
)
self.routing_table[region_id] = sorted_regions
async def route_request(self, client_location: Dict[str, float], request_data: Dict) -> Dict[str, Any]:
"""全球路由请求"""
# 找到最近的区域
nearest_region = self.find_nearest_region(
client_location['latitude'],
client_location['longitude']
)
if not nearest_region:
raise Exception("没有可用的区域")
try:
# 尝试在最近区域处理请求
result = await self._handle_request_in_region(nearest_region.region_id, request_data)
return {
'success': True,
'data': result,
'processed_region': nearest_region.region_id,
'processing_time': result.get('processing_time', 0)
}
except Exception as e:
print(f"区域 {nearest_region.region_id} 处理失败: {e}")
# 故障转移到备用区域
backup_regions = self.routing_table.get(nearest_region.region_id, [])
for backup_region_id in backup_regions:
backup_region = self.regions.get(backup_region_id)
if backup_region and backup_region.status == RegionStatus.ACTIVE:
try:
result = await self._handle_request_in_region(backup_region_id, request_data)
return {
'success': True,
'data': result,
'processed_region': backup_region_id,
'fallback_from': nearest_region.region_id,
'processing_time': result.get('processing_time', 0)
}
except Exception as backup_e:
print(f"备用区域 {backup_region_id} 也失败: {backup_e}")
continue
# 所有区域都失败
raise Exception("所有区域都不可用")
async def _handle_request_in_region(self, region_id: str, request_data: Dict) -> Dict[str, Any]:
"""在指定区域处理请求"""
region = self.regions.get(region_id)
if not region:
raise Exception(f"区域 {region_id} 不存在")
start_time = time.time()
# 选择数据中心
primary_dc_id = region.primary_datacenter
primary_dc = self.datacenters.get(primary_dc_id)
if not primary_dc or primary_dc.status != RegionStatus.ACTIVE:
# 尝试备用数据中心
for backup_dc_id in region.backup_datacenters:
backup_dc = self.datacenters.get(backup_dc_id)
if backup_dc and backup_dc.status == RegionStatus.ACTIVE:
primary_dc = backup_dc
break
else:
raise Exception(f"区域 {region_id} 没有可用的数据中心")
# 模拟请求处理
processing_time = random.uniform(0.05, 0.2) # 50-200ms
await asyncio.sleep(processing_time)
# 更新负载
region.current_load += 1
# 模拟响应
response = {
'status': 'success',
'region_id': region_id,
'datacenter_id': primary_dc.datacenter_id,
'processing_time': time.time() - start_time,
'data': f"请求在区域 {region.region_name} 处理完成"
}
return response
def get_global_cluster_stats(self) -> Dict[str, Any]:
"""获取全球集群统计"""
stats = {
'total_regions': len(self.regions),
'active_regions': len([r for r in self.regions.values() if r.status == RegionStatus.ACTIVE]),
'total_datacenters': len(self.datacenters),
'active_datacenters': len([dc for dc in self.datacenters.values() if dc.status == RegionStatus.ACTIVE]),
'global_load': sum(r.current_load for r in self.regions.values()),
'global_capacity': sum(r.max_capacity for r in self.regions.values()),
'region_details': {},
'latency_matrix': self.latency_matrix,
'routing_table': self.routing_table
}
# 区域详细信息
for region_id, region in self.regions.items():
stats['region_details'][region_id] = {
'name': region.region_name,
'status': region.status.value,
'current_load': region.current_load,
'max_capacity': region.max_capacity,
'load_percentage': (region.current_load / region.max_capacity) * 100,
'coordinates': {'lat': region.latitude, 'lng': region.longitude},
'datacenters': len([dc for dc in self.datacenters.values() if dc.region_id == region_id])
}
return stats
class GlobalFailureDetector:
"""全球故障检测器"""
def __init__(self):
self.monitoring_task = None
self.failure_callbacks = []
async def start(self):
"""启动故障检测"""
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
async def _monitoring_loop(self):
"""监控循环"""
while True:
try:
await asyncio.sleep(30) # 每30秒检查一次
await self._check_global_health()
except Exception as e:
print(f"全球故障检测错误: {e}")
async def _check_global_health(self):
"""检查全球健康状态"""
# 简化实现:随机模拟故障
if random.random() < 0.05: # 5%概率模拟故障
region_ids = ['us-east-1', 'us-west-1', 'eu-west-1', 'ap-southeast-1', 'ap-northeast-1']
failed_region = random.choice(region_ids)
print(f"检测到区域故障: {failed_region}")
# 触发故障回调
for callback in self.failure_callbacks:
await callback(failed_region)
def add_failure_callback(self, callback):
"""添加故障回调"""
self.failure_callbacks.append(callback)
class DisasterRecoveryManager:
"""灾难恢复管理器"""
def __init__(self):
self.recovery_procedures = {}
self.backup_strategies = {}
async def start(self):
"""启动灾难恢复管理器"""
print("灾难恢复管理器已启动")
async def handle_region_failure(self, failed_region_id: str):
"""处理区域故障"""
print(f"开始处理区域故障: {failed_region_id}")
# 执行故障转移
await self._execute_failover(failed_region_id)
# 启动恢复程序
await self._start_recovery_procedure(failed_region_id)
async def _execute_failover(self, failed_region_id: str):
"""执行故障转移"""
print(f"执行区域 {failed_region_id} 的故障转移")
# 重新路由流量到其他区域
# 这里应该实现具体的流量转移逻辑
await asyncio.sleep(1) # 模拟转移时间
print(f"区域 {failed_region_id} 故障转移完成")
async def _start_recovery_procedure(self, failed_region_id: str):
"""启动恢复程序"""
print(f"启动区域 {failed_region_id} 的恢复程序")
# 模拟恢复过程
await asyncio.sleep(5) # 模拟恢复时间
print(f"区域 {failed_region_id} 恢复完成")
class GlobalDataSynchronizer:
"""全球数据同步器"""
def __init__(self):
self.sync_task = None
async def start(self):
"""启动数据同步"""
self.sync_task = asyncio.create_task(self._sync_loop())
async def _sync_loop(self):
"""同步循环"""
while True:
try:
await asyncio.sleep(300) # 每5分钟同步一次
await self._sync_global_data()
except Exception as e:
print(f"全球数据同步错误: {e}")
async def _sync_global_data(self):
"""同步全球数据"""
# 这里实现具体的数据同步逻辑
# 例如:配置同步、状态同步、缓存同步等
pass
class GlobalClusterMonitor:
"""全球集群监控"""
def __init__(self):
self.monitoring_task = None
async def start(self):
"""启动全球监控"""
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
async def _monitoring_loop(self):
"""监控循环"""
while True:
try:
await asyncio.sleep(60) # 每分钟监控一次
await self._collect_global_metrics()
except Exception as e:
print(f"全球集群监控错误: {e}")
async def _collect_global_metrics(self):
"""收集全球指标"""
# 收集各区域的性能指标
# 生成全球性能报告
pass
# 全球分布式部署示例
async def global_deployment_example():
"""全球分布式部署示例"""
# 创建全球集群管理器
global_manager = GlobalProxyClusterManager()
# 初始化全球集群
await global_manager.initialize_global_cluster()
# 添加故障恢复回调
global_manager.failure_detector.add_failure_callback(
global_manager.disaster_recovery.handle_region_failure
)
# 模拟不同地区的客户端请求
client_locations = [
{'latitude': 40.7128, 'longitude': -74.0060, 'location': '纽约'},
{'latitude': 37.7749, 'longitude': -122.4194, 'location': '旧金山'},
{'latitude': 51.5074, 'longitude': -0.1278, 'location': '伦敦'},
{'latitude': 1.3521, 'longitude': 103.8198, 'location': '新加坡'},
{'latitude': 35.6762, 'longitude': 139.6503, 'location': '东京'}
]
# 测试请求路由
print("\n测试全球请求路由:")
print("=" * 60)
for client_location in client_locations:
try:
request_data = {
'method': 'GET',
'url': '/api/data',
'headers': {'User-Agent': 'GlobalClient'}
}
result = await global_manager.route_request(client_location, request_data)
print(f"客户端位置: {client_location['location']}")
print(f" 处理区域: {result['processed_region']}")
print(f" 处理时间: {result['processing_time']:.3f}s")
if 'fallback_from' in result:
print(f" 故障转移自: {result['fallback_from']}")
print()
except Exception as e:
print(f"请求处理失败 ({client_location['location']}): {e}")
# 显示全球集群统计
stats = global_manager.get_global_cluster_stats()
print("全球集群统计:")
print("=" * 60)
print(f"总区域数: {stats['total_regions']}")
print(f"活跃区域: {stats['active_regions']}")
print(f"总数据中心: {stats['total_datacenters']}")
print(f"活跃数据中心: {stats['active_datacenters']}")
print(f"全球负载: {stats['global_load']}")
print(f"全球容量: {stats['global_capacity']}")
print(f"负载利用率: {(stats['global_load'] / stats['global_capacity']) * 100:.1f}%")
print(f"\n区域详情:")
for region_id, details in stats['region_details'].items():
print(f" {region_id}: {details['name']}")
print(f" 状态: {details['status']}")
print(f" 负载: {details['current_load']}/{details['max_capacity']} ({details['load_percentage']:.1f}%)")
print(f" 坐标: ({details['coordinates']['lat']:.4f}, {details['coordinates']['lng']:.4f})")
print()
# 显示延迟矩阵
print("区域间延迟矩阵 (ms):")
regions = list(stats['region_details'].keys())
print(f"{'':12}", end='')
for region in regions:
print(f"{region:12}", end='')
print()
for region1 in regions:
print(f"{region1:12}", end='')
for region2 in regions:
latency = stats['latency_matrix'].get((region1, region2), 0)
print(f"{latency:10.1f} ", end='')
print()
# 运行一段时间以观察监控和故障恢复
print(f"\n全球集群运行中,观察故障检测和恢复...")
await asyncio.sleep(60)
if __name__ == "__main__":
asyncio.run(global_deployment_example())
📊 系统监控与运维
6.1 全链路监控体系
构建完整的监控和可观测性系统,实现问题的快速发现和定位:
6.2 核心监控指标
系统性能指标:
- QPS (每秒查询数)
- 平均响应时间和P99延迟
- 错误率和成功率
- 并发连接数
资源利用率指标:
- CPU、内存、磁盘使用率
- 网络带宽利用率
- 连接池效率
- 缓存命中率
业务指标:
- 代理节点健康度
- 地域流量分布
- 用户请求模式
- 成本效益分析
🎯 最佳实践与总结
核心技术要点
通过本文的深入探讨,我们系统性地介绍了分布式IP代理集群的完整技术栈:
架构设计原则
- 分层架构,职责清晰
- 微服务设计,独立扩展
- 高可用设计,故障隔离
- 横向扩展,弹性伸缩
智能调度算法
- 多策略融合,动态选择
- 机器学习优化,持续改进
- 地理位置感知,就近路由
- 负载均衡,性能最优
性能优化技术
- 连接池管理,资源复用
- 缓存策略,减少延迟
- 异步处理,提高吞吐
- 批处理优化,降低开销
全球分布式部署
- 多地域部署,就近服务
- 故障检测,自动恢复
- 数据同步,一致性保证
- 灾难恢复,业务连续
最佳实践建议
架构设计
- 模块化设计: 采用微服务架构,确保各组件独立开发、部署和扩展
- 接口标准化: 定义清晰的API接口,支持多语言客户端接入
- 配置中心: 统一配置管理,支持动态配置更新
性能优化
- 连接池调优: 根据业务特点调整连接池参数,平衡资源使用和性能
- 缓存策略: 合理设计多级缓存,提高响应速度
- 异步处理: 大量使用异步I/O,提高系统吞吐量
可靠性保障
- 健康检查: 实时监控节点健康状态,及时发现和处理故障
- 故障转移: 设计完善的故障转移机制,确保服务高可用
- 限流降级: 实施流量控制和服务降级策略,保护系统稳定
监控运维
- 全链路监控: 建立完整的监控体系,覆盖所有关键指标
- 智能告警: 基于机器学习的智能告警,减少误报
- 自动化运维: 实现自动化部署、扩缩容和故障处理
安全合规
- 访问控制: 实施严格的身份认证和授权机制
- 数据加密: 敏感数据传输和存储加密
- 合规审计: 定期进行安全审计,确保符合相关法规
技术发展趋势
- 云原生化: 全面拥抱容器化和Kubernetes,提高部署和管理效率
- 智能化运维: 基于AI的自动化运维,预测性维护和智能调优
- 边缘计算: 结合边缘计算技术,降低延迟,提升用户体验
- 服务网格: 采用Service Mesh技术,简化微服务间通信管理
性能指标参考
企业级部署规模:
- 支持10万+并发连接
- 99.99%可用性保证
- 毫秒级响应时间
- PB级数据处理能力
关键性能指标:
- QPS: 50,000+请求/秒
- 延迟: P99 < 100ms
- 错误率: < 0.01%
- 恢复时间: < 30秒
通过实施这些最佳实践和技术方案,可以构建出一个高性能、高可用、高扩展性的分布式IP代理集群系统,满足企业级应用的严苛要求。
💡 结语
分布式IP代理集群架构代表了现代互联网基础设施的前沿技术实践。从单节点服务到全球分布式集群,从简单负载均衡到智能调度系统,每一项技术的演进都体现了对性能、可靠性和用户体验的不懈追求。
掌握这些核心技术,不仅能够解决当前的业务需求,更能为未来的技术发展奠定坚实基础。随着云原生、人工智能、边缘计算等新兴技术的不断发展,分布式代理系统将继续演进,为构建更加智能、高效的互联网基础设施提供强大支撑。
本文《分布式IP代理集群架构与智能调度系统》提供了完整的企业级解决方案。从架构设计到算法实现,从性能优化到全球部署,每个章节都结合了业界最佳实践和前沿技术方案,为构建高可用、高性能的分布式代理系统提供了全面的技术指导。
📝 版权声明: 本文为原创技术文章,转载请注明出处
🔗 相关文章: IP代理技术系列文章第六篇
💬 技术交流: 欢迎在评论区讨论分布式架构相关技术问题