028_分布式部署架构
概述
本文档介绍如何设计和实现Claude应用的分布式部署架构,包括负载均衡、缓存策略、服务发现、容错机制等。
微服务架构设计
1. 服务拆分策略
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import asyncio
import aiohttp
import json
from dataclasses import dataclass
from enum import Enum
class ServiceType(Enum):
GATEWAY = "gateway"
AUTH = "auth"
CONVERSATION = "conversation"
TRANSLATION = "translation"
CONTENT_FILTER = "content_filter"
CACHE = "cache"
METRICS = "metrics"
@dataclass
class ServiceConfig:
name: str
service_type: ServiceType
host: str
port: int
health_check_path: str = "/health"
version: str = "1.0.0"
replicas: int = 1
max_requests_per_second: int = 100
class BaseService(ABC):
def __init__(self, config: ServiceConfig):
self.config = config
self.is_healthy = True
self.metrics = {
'requests_processed': 0,
'errors_count': 0,
'avg_response_time': 0
}
@abstractmethod
async def start(self):
"""启动服务"""
pass
@abstractmethod
async def stop(self):
"""停止服务"""
pass
@abstractmethod
async def health_check(self) -> Dict[str, Any]:
"""健康检查"""
pass
async def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理请求的通用包装器"""
start_time = asyncio.get_event_loop().time()
try:
self.metrics['requests_processed'] += 1
result = await self._process_request_impl(request)
# 更新响应时间
response_time = asyncio.get_event_loop().time() - start_time
self._update_avg_response_time(response_time)
return {
'status': 'success',
'data': result,
'service': self.config.name,
'response_time': response_time
}
except Exception as e:
self.metrics['errors_count'] += 1
response_time = asyncio.get_event_loop().time() - start_time
return {
'status': 'error',
'error': str(e),
'service': self.config.name,
'response_time': response_time
}
@abstractmethod
async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
"""实际的请求处理逻辑"""
pass
def _update_avg_response_time(self, new_time: float):
"""更新平均响应时间"""
current_avg = self.metrics['avg_response_time']
total_requests = self.metrics['requests_processed']
# 简单的移动平均
self.metrics['avg_response_time'] = (
(current_avg * (total_requests - 1) + new_time) / total_requests
)
# Claude服务实现
class ClaudeService(BaseService):
def __init__(self, config: ServiceConfig, anthropic_client):
super().__init__(config)
self.client = anthropic_client
self.conversation_manager = None
async def start(self):
"""启动Claude服务"""
# 初始化必要的组件
self.is_healthy = True
print(f"Claude service {self.config.name} started on {self.config.host}:{self.config.port}")
async def stop(self):
"""停止Claude服务"""
self.is_healthy = False
print(f"Claude service {self.config.name} stopped")
async def health_check(self) -> Dict[str, Any]:
"""健康检查"""
# 检查与Claude API的连接
try:
# 简单的测试请求
response = await self._make_test_request()
return {
'status': 'healthy',
'service': self.config.name,
'version': self.config.version,
'metrics': self.metrics,
'api_connection': 'ok'
}
except Exception as e:
self.is_healthy = False
return {
'status': 'unhealthy',
'service': self.config.name,
'error': str(e)
}
async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
"""处理Claude请求"""
messages = request.get('messages', [])
model = request.get('model', 'claude-3-5-sonnet-20241022')
max_tokens = request.get('max_tokens', 1000)
# 调用Claude API
response = self.client.messages.create(
model=model,
messages=messages,
max_tokens=max_tokens
)
return {
'content': response.content[0].text,
'usage': {
'input_tokens': response.usage.input_tokens,
'output_tokens': response.usage.output_tokens
}
}
async def _make_test_request(self):
"""发送测试请求"""
return self.client.messages.create(
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "test"}],
max_tokens=10
)
# 认证服务
class AuthService(BaseService):
def __init__(self, config: ServiceConfig):
super().__init__(config)
self.api_keys = {} # 实际应使用数据库
self.rate_limits = {}
async def start(self):
self.is_healthy = True
print(f"Auth service started on {self.config.host}:{self.config.port}")
async def stop(self):
self.is_healthy = False
async def health_check(self) -> Dict[str, Any]:
return {
'status': 'healthy',
'service': self.config.name,
'version': self.config.version,
'metrics': self.metrics
}
async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
"""处理认证请求"""
api_key = request.get('api_key')
action = request.get('action', 'validate')
if action == 'validate':
return await self._validate_api_key(api_key)
elif action == 'check_rate_limit':
return await self._check_rate_limit(api_key)
else:
raise ValueError(f"Unknown action: {action}")
async def _validate_api_key(self, api_key: str) -> Dict[str, Any]:
"""验证API密钥"""
# 简化的验证逻辑
if api_key and api_key.startswith('sk-'):
return {
'valid': True,
'user_id': f"user_{hash(api_key) % 10000}",
'tier': 'standard'
}
else:
return {'valid': False}
async def _check_rate_limit(self, api_key: str) -> Dict[str, Any]:
"""检查速率限制"""
# 简化的速率限制检查
current_count = self.rate_limits.get(api_key, 0)
if current_count < 100: # 假设限制为100请求/小时
self.rate_limits[api_key] = current_count + 1
return {
'allowed': True,
'remaining': 100 - current_count - 1
}
else:
return {
'allowed': False,
'remaining': 0,
'reset_time': 3600 # 1小时后重置
}
# 缓存服务
class CacheService(BaseService):
def __init__(self, config: ServiceConfig):
super().__init__(config)
self.cache = {} # 实际应使用Redis
self.ttl = {}
async def start(self):
self.is_healthy = True
print(f"Cache service started on {self.config.host}:{self.config.port}")
async def stop(self):
self.is_healthy = False
async def health_check(self) -> Dict[str, Any]:
return {
'status': 'healthy',
'service': self.config.name,
'cache_size': len(self.cache),
'metrics': self.metrics
}
async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
"""处理缓存请求"""
action = request.get('action')
key = request.get('key')
if action == 'get':
return await self._get(key)
elif action == 'set':
value = request.get('value')
ttl = request.get('ttl', 3600)
return await self._set(key, value, ttl)
elif action == 'delete':
return await self._delete(key)
else:
raise ValueError(f"Unknown action: {action}")
async def _get(self, key: str) -> Dict[str, Any]:
"""获取缓存值"""
if key in self.cache:
return {
'found': True,
'value': self.cache[key]
}
else:
return {'found': False}
async def _set(self, key: str, value: Any, ttl: int) -> Dict[str, Any]:
"""设置缓存值"""
self.cache[key] = value
# 简化的TTL处理
return {'success': True}
async def _delete(self, key: str) -> Dict[str, Any]:
"""删除缓存值"""
if key in self.cache:
del self.cache[key]
return {'deleted': True}
else:
return {'deleted': False}
2. 服务发现与注册
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import List, Dict
class ServiceRegistry:
def __init__(self):
self.services = {} # service_name -> [ServiceInstance]
self.health_check_interval = 30 # 30秒
self.health_check_task = None
async def start(self):
"""启动服务注册中心"""
self.health_check_task = asyncio.create_task(
self._periodic_health_check()
)
print("Service registry started")
async def stop(self):
"""停止服务注册中心"""
if self.health_check_task:
self.health_check_task.cancel()
print("Service registry stopped")
def register_service(
self,
service_name: str,
instance_id: str,
host: str,
port: int,
metadata: Dict[str, Any] = None
):
"""注册服务实例"""
instance = ServiceInstance(
service_name=service_name,
instance_id=instance_id,
host=host,
port=port,
metadata=metadata or {}
)
if service_name not in self.services:
self.services[service_name] = []
# 移除已存在的实例(如果有)
self.services[service_name] = [
s for s in self.services[service_name]
if s.instance_id != instance_id
]
# 添加新实例
self.services[service_name].append(instance)
print(f"Service registered: {service_name}/{instance_id}")
def deregister_service(self, service_name: str, instance_id: str):
"""注销服务实例"""
if service_name in self.services:
self.services[service_name] = [
s for s in self.services[service_name]
if s.instance_id != instance_id
]
# 如果没有实例了,删除服务
if not self.services[service_name]:
del self.services[service_name]
print(f"Service deregistered: {service_name}/{instance_id}")
def discover_services(self, service_name: str) -> List['ServiceInstance']:
"""发现服务实例"""
return [
instance for instance in self.services.get(service_name, [])
if instance.is_healthy
]
def get_all_services(self) -> Dict[str, List['ServiceInstance']]:
"""获取所有服务"""
return self.services.copy()
async def _periodic_health_check(self):
"""定期健康检查"""
while True:
try:
await self._check_all_services_health()
await asyncio.sleep(self.health_check_interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Health check error: {e}")
await asyncio.sleep(5)
async def _check_all_services_health(self):
"""检查所有服务的健康状态"""
tasks = []
for service_name, instances in self.services.items():
for instance in instances:
task = self._check_instance_health(instance)
tasks.append(task)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _check_instance_health(self, instance: 'ServiceInstance'):
"""检查单个实例的健康状态"""
try:
async with aiohttp.ClientSession() as session:
url = f"http://{instance.host}:{instance.port}/health"
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
instance.mark_healthy()
else:
instance.mark_unhealthy()
except Exception:
instance.mark_unhealthy()
@dataclass
class ServiceInstance:
service_name: str
instance_id: str
host: str
port: int
metadata: Dict[str, Any]
is_healthy: bool = True
last_health_check: datetime = None
consecutive_failures: int = 0
def __post_init__(self):
self.last_health_check = datetime.now()
def mark_healthy(self):
"""标记为健康"""
self.is_healthy = True
self.consecutive_failures = 0
self.last_health_check = datetime.now()
def mark_unhealthy(self):
"""标记为不健康"""
self.consecutive_failures += 1
# 连续失败3次才标记为不健康
if self.consecutive_failures >= 3:
self.is_healthy = False
self.last_health_check = datetime.now()
def get_endpoint(self) -> str:
"""获取服务端点"""
return f"http://{self.host}:{self.port}"
# 负载均衡器
class LoadBalancer:
def __init__(self, service_registry: ServiceRegistry):
self.registry = service_registry
self.strategies = {
'round_robin': self._round_robin,
'random': self._random_selection,
'least_connections': self._least_connections,
'weighted': self._weighted_selection
}
self.round_robin_counters = {}
def select_instance(
self,
service_name: str,
strategy: str = 'round_robin'
) -> Optional[ServiceInstance]:
"""选择服务实例"""
instances = self.registry.discover_services(service_name)
if not instances:
return None
if len(instances) == 1:
return instances[0]
selection_func = self.strategies.get(strategy, self._round_robin)
return selection_func(service_name, instances)
def _round_robin(
self,
service_name: str,
instances: List[ServiceInstance]
) -> ServiceInstance:
"""轮询策略"""
if service_name not in self.round_robin_counters:
self.round_robin_counters[service_name] = 0
index = self.round_robin_counters[service_name] % len(instances)
self.round_robin_counters[service_name] += 1
return instances[index]
def _random_selection(
self,
service_name: str,
instances: List[ServiceInstance]
) -> ServiceInstance:
"""随机选择策略"""
import random
return random.choice(instances)
def _least_connections(
self,
service_name: str,
instances: List[ServiceInstance]
) -> ServiceInstance:
"""最少连接策略(简化版)"""
# 这里简化为随机选择,实际应跟踪连接数
return self._random_selection(service_name, instances)
def _weighted_selection(
self,
service_name: str,
instances: List[ServiceInstance]
) -> ServiceInstance:
"""加权选择策略"""
# 基于实例的元数据中的权重
weights = []
for instance in instances:
weight = instance.metadata.get('weight', 1)
weights.append(weight)
import random
total_weight = sum(weights)
r = random.uniform(0, total_weight)
cumulative = 0
for i, weight in enumerate(weights):
cumulative += weight
if r <= cumulative:
return instances[i]
return instances[0]
缓存与存储策略
1. 分布式缓存系统
import redis
import json
import hashlib
from typing import Any, Optional, Union
import pickle
class DistributedCacheManager:
def __init__(
self,
redis_hosts: List[str],
cache_prefix: str = "claude_app",
default_ttl: int = 3600
):
self.cache_prefix = cache_prefix
self.default_ttl = default_ttl
# 创建Redis连接池
self.redis_clients = []
for host in redis_hosts:
host_parts = host.split(':')
redis_host = host_parts[0]
redis_port = int(host_parts[1]) if len(host_parts) > 1 else 6379
client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=False, # 用于存储二进制数据
socket_connect_timeout=5,
socket_timeout=5
)
self.redis_clients.append(client)
def _get_client(self, key: str) -> redis.Redis:
"""基于key选择Redis客户端(一致性哈希)"""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
index = hash_value % len(self.redis_clients)
return self.redis_clients[index]
def _make_key(self, key: str) -> str:
"""生成缓存键"""
return f"{self.cache_prefix}:{key}"
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
cache_key = self._make_key(key)
client = self._get_client(cache_key)
try:
data = client.get(cache_key)
if data is None:
return None
# 反序列化数据
return pickle.loads(data)
except Exception as e:
print(f"Cache get error for key {key}: {e}")
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""设置缓存值"""
cache_key = self._make_key(key)
client = self._get_client(cache_key)
try:
# 序列化数据
serialized_value = pickle.dumps(value)
# 设置TTL
expire_time = ttl or self.default_ttl
return client.setex(cache_key, expire_time, serialized_value)
except Exception as e:
print(f"Cache set error for key {key}: {e}")
return False
async def delete(self, key: str) -> bool:
"""删除缓存值"""
cache_key = self._make_key(key)
client = self._get_client(cache_key)
try:
return bool(client.delete(cache_key))
except Exception as e:
print(f"Cache delete error for key {key}: {e}")
return False
async def exists(self, key: str) -> bool:
"""检查键是否存在"""
cache_key = self._make_key(key)
client = self._get_client(cache_key)
try:
return bool(client.exists(cache_key))
except Exception as e:
print(f"Cache exists error for key {key}: {e}")
return False
# 智能缓存策略
class SmartCacheStrategy:
def __init__(self, cache_manager: DistributedCacheManager):
self.cache_manager = cache_manager
self.cache_policies = {
'conversation': {'ttl': 1800, 'compress': True}, # 30分钟
'translation': {'ttl': 7200, 'compress': False}, # 2小时
'user_profile': {'ttl': 3600, 'compress': False}, # 1小时
'api_response': {'ttl': 300, 'compress': True} # 5分钟
}
async def cache_conversation(
self,
conversation_id: str,
messages: List[Dict],
user_id: str
) -> bool:
"""缓存对话数据"""
key = f"conversation:{conversation_id}"
# 可选:压缩大对话
policy = self.cache_policies['conversation']
data = {
'messages': messages,
'user_id': user_id,
'cached_at': datetime.now().isoformat()
}
if policy['compress'] and len(json.dumps(data)) > 10000:
data = self._compress_conversation(data)
return await self.cache_manager.set(
key,
data,
ttl=policy['ttl']
)
async def get_conversation(
self,
conversation_id: str
) -> Optional[Dict]:
"""获取缓存的对话"""
key = f"conversation:{conversation_id}"
data = await self.cache_manager.get(key)
if data and 'compressed' in data:
data = self._decompress_conversation(data)
return data
async def cache_api_response(
self,
request_hash: str,
response: Dict[str, Any]
) -> bool:
"""缓存API响应"""
key = f"api_response:{request_hash}"
policy = self.cache_policies['api_response']
return await self.cache_manager.set(
key,
response,
ttl=policy['ttl']
)
async def get_cached_api_response(
self,
request_hash: str
) -> Optional[Dict]:
"""获取缓存的API响应"""
key = f"api_response:{request_hash}"
return await self.cache_manager.get(key)
def _compress_conversation(self, data: Dict) -> Dict:
"""压缩对话数据(简化版)"""
import gzip
messages_json = json.dumps(data['messages'])
compressed_messages = gzip.compress(messages_json.encode())
return {
'compressed': True,
'messages': compressed_messages,
'user_id': data['user_id'],
'cached_at': data['cached_at']
}
def _decompress_conversation(self, data: Dict) -> Dict:
"""解压缩对话数据"""
import gzip
decompressed_messages = gzip.decompress(data['messages']).decode()
messages = json.loads(decompressed_messages)
return {
'messages': messages,
'user_id': data['user_id'],
'cached_at': data['cached_at']
}
# 缓存预热和失效策略
class CacheWarmupManager:
def __init__(self, cache_strategy: SmartCacheStrategy):
self.cache_strategy = cache_strategy
self.warmup_tasks = []
async def warmup_user_data(self, user_id: str):
"""预热用户数据"""
# 预加载用户的最近对话
recent_conversations = await self._get_recent_conversations(user_id)
tasks = []
for conv in recent_conversations[:5]: # 只预热最近5个对话
task = self._warmup_conversation(conv['id'])
tasks.append(task)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _warmup_conversation(self, conversation_id: str):
"""预热单个对话"""
# 检查是否已缓存
cached = await self.cache_strategy.get_conversation(conversation_id)
if not cached:
# 从数据库加载并缓存
conversation_data = await self._load_conversation_from_db(conversation_id)
if conversation_data:
await self.cache_strategy.cache_conversation(
conversation_id,
conversation_data['messages'],
conversation_data['user_id']
)
async def _get_recent_conversations(self, user_id: str) -> List[Dict]:
"""获取用户最近的对话(模拟)"""
# 实际应从数据库查询
return [
{'id': f'conv_{user_id}_{i}', 'updated_at': datetime.now()}
for i in range(10)
]
async def _load_conversation_from_db(self, conversation_id: str) -> Optional[Dict]:
"""从数据库加载对话(模拟)"""
# 实际应从数据库查询
return {
'messages': [
{'role': 'user', 'content': f'Message in {conversation_id}'}
],
'user_id': 'user123'
}
2. 数据分片与分区
class DatabaseShardManager:
def __init__(self, shard_configs: List[Dict]):
self.shards = {}
self.shard_ring = []
for config in shard_configs:
shard_id = config['shard_id']
self.shards[shard_id] = DatabaseShard(config)
# 构建一致性哈希环
for i in range(config.get('virtual_nodes', 100)):
hash_value = self._hash(f"{shard_id}:{i}")
self.shard_ring.append((hash_value, shard_id))
# 排序哈希环
self.shard_ring.sort()
def _hash(self, key: str) -> int:
"""计算哈希值"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def get_shard(self, key: str) -> 'DatabaseShard':
"""根据键获取对应的分片"""
hash_value = self._hash(key)
# 在哈希环中找到第一个大于等于hash_value的节点
for ring_hash, shard_id in self.shard_ring:
if ring_hash >= hash_value:
return self.shards[shard_id]
# 如果没找到,返回第一个节点(环形)
return self.shards[self.shard_ring[0][1]]
async def save_conversation(
self,
conversation_id: str,
conversation_data: Dict
):
"""保存对话到相应分片"""
shard = self.get_shard(conversation_id)
await shard.save_conversation(conversation_id, conversation_data)
async def load_conversation(
self,
conversation_id: str
) -> Optional[Dict]:
"""从相应分片加载对话"""
shard = self.get_shard(conversation_id)
return await shard.load_conversation(conversation_id)
async def save_user_data(
self,
user_id: str,
user_data: Dict
):
"""保存用户数据"""
shard = self.get_shard(user_id)
await shard.save_user_data(user_id, user_data)
async def get_shard_stats(self) -> Dict[str, Any]:
"""获取分片统计信息"""
stats = {}
for shard_id, shard in self.shards.items():
stats[shard_id] = await shard.get_stats()
return stats
class DatabaseShard:
def __init__(self, config: Dict):
self.shard_id = config['shard_id']
self.host = config['host']
self.port = config['port']
self.database = config['database']
self.connection_pool = None
# 统计信息
self.stats = {
'total_conversations': 0,
'total_users': 0,
'storage_size': 0,
'last_updated': datetime.now()
}
async def connect(self):
"""连接到数据库"""
# 这里应该初始化真实的数据库连接
print(f"Connected to shard {self.shard_id} at {self.host}:{self.port}")
async def save_conversation(
self,
conversation_id: str,
conversation_data: Dict
):
"""保存对话数据"""
# 实际应保存到数据库
self.stats['total_conversations'] += 1
self.stats['last_updated'] = datetime.now()
print(f"Saved conversation {conversation_id} to shard {self.shard_id}")
async def load_conversation(
self,
conversation_id: str
) -> Optional[Dict]:
"""加载对话数据"""
# 实际应从数据库查询
return {
'id': conversation_id,
'messages': [],
'created_at': datetime.now().isoformat()
}
async def save_user_data(self, user_id: str, user_data: Dict):
"""保存用户数据"""
self.stats['total_users'] += 1
self.stats['last_updated'] = datetime.now()
async def get_stats(self) -> Dict[str, Any]:
"""获取分片统计信息"""
return {
'shard_id': self.shard_id,
'host': self.host,
'port': self.port,
**self.stats
}
容错与恢复
1. 服务容错机制
import asyncio
from enum import Enum
from typing import Callable, Any
class CircuitBreakerState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""执行函数调用with熔断保护"""
if self.state == CircuitBreakerState.OPEN:
if self._should_attempt_reset():
self.state = CircuitBreakerState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""检查是否应该尝试重置"""
if self.last_failure_time is None:
return False
return (
asyncio.get_event_loop().time() - self.last_failure_time >
self.recovery_timeout
)
def _on_success(self):
"""成功时的处理"""
self.failure_count = 0
self.state = CircuitBreakerState.CLOSED
def _on_failure(self):
"""失败时的处理"""
self.failure_count += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
# 服务容错包装器
class FaultTolerantService:
def __init__(
self,
primary_service: BaseService,
fallback_services: List[BaseService] = None,
circuit_breaker: CircuitBreaker = None
):
self.primary_service = primary_service
self.fallback_services = fallback_services or []
self.circuit_breaker = circuit_breaker or CircuitBreaker()
self.current_service = primary_service
async def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""容错的请求处理"""
try:
# 尝试使用主服务
return await self.circuit_breaker.call(
self.primary_service.process_request,
request
)
except Exception as primary_error:
print(f"Primary service failed: {primary_error}")
# 尝试使用备用服务
for fallback_service in self.fallback_services:
try:
result = await fallback_service.process_request(request)
result['fallback_used'] = True
result['primary_error'] = str(primary_error)
return result
except Exception as fallback_error:
print(f"Fallback service failed: {fallback_error}")
continue
# 所有服务都失败了
return {
'status': 'error',
'error': 'All services unavailable',
'primary_error': str(primary_error)
}
# 自动恢复管理器
class AutoRecoveryManager:
def __init__(self, service_registry: ServiceRegistry):
self.service_registry = service_registry
self.recovery_strategies = {}
self.monitoring_tasks = {}
def register_recovery_strategy(
self,
service_name: str,
strategy: Callable
):
"""注册恢复策略"""
self.recovery_strategies[service_name] = strategy
async def start_monitoring(self, service_name: str):
"""开始监控服务"""
if service_name in self.monitoring_tasks:
return
task = asyncio.create_task(
self._monitor_service(service_name)
)
self.monitoring_tasks[service_name] = task
async def stop_monitoring(self, service_name: str):
"""停止监控服务"""
if service_name in self.monitoring_tasks:
self.monitoring_tasks[service_name].cancel()
del self.monitoring_tasks[service_name]
async def _monitor_service(self, service_name: str):
"""监控单个服务"""
consecutive_failures = 0
while True:
try:
instances = self.service_registry.discover_services(service_name)
healthy_instances = [i for i in instances if i.is_healthy]
if len(healthy_instances) == 0 and len(instances) > 0:
consecutive_failures += 1
if consecutive_failures >= 3:
await self._trigger_recovery(service_name)
consecutive_failures = 0
else:
consecutive_failures = 0
await asyncio.sleep(30) # 每30秒检查一次
except asyncio.CancelledError:
break
except Exception as e:
print(f"Monitoring error for {service_name}: {e}")
await asyncio.sleep(10)
async def _trigger_recovery(self, service_name: str):
"""触发服务恢复"""
print(f"Triggering recovery for service: {service_name}")
if service_name in self.recovery_strategies:
try:
await self.recovery_strategies[service_name]()
except Exception as e:
print(f"Recovery failed for {service_name}: {e}")
else:
# 默认恢复策略:重启服务
await self._default_recovery(service_name)
async def _default_recovery(self, service_name: str):
"""默认恢复策略"""
# 简化的重启逻辑
print(f"Attempting to restart {service_name}")
# 这里应该实现真实的服务重启逻辑
# 例如:调用容器编排系统的API
await asyncio.sleep(5) # 模拟重启时间
print(f"Service {service_name} recovery attempted")
监控与运维
1. 分布式监控系统
import time
from dataclasses import dataclass
from typing import Dict, List, Any
import asyncio
@dataclass
class Metric:
name: str
value: float
timestamp: float
tags: Dict[str, str]
unit: str = None
class MetricsCollector:
def __init__(self):
self.metrics = []
self.counters = {}
self.gauges = {}
self.histograms = {}
def counter(self, name: str, value: float = 1, tags: Dict[str, str] = None):
"""记录计数器指标"""
key = f"{name}:{tags or {}}"
self.counters[key] = self.counters.get(key, 0) + value
self.metrics.append(Metric(
name=name,
value=self.counters[key],
timestamp=time.time(),
tags=tags or {},
unit="count"
))
def gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""记录瞬时值指标"""
key = f"{name}:{tags or {}}"
self.gauges[key] = value
self.metrics.append(Metric(
name=name,
value=value,
timestamp=time.time(),
tags=tags or {},
unit="gauge"
))
def histogram(
self,
name: str,
value: float,
tags: Dict[str, str] = None
):
"""记录直方图指标"""
key = f"{name}:{tags or {}}"
if key not in self.histograms:
self.histograms[key] = {
'count': 0,
'sum': 0,
'min': float('inf'),
'max': float('-inf'),
'values': []
}
hist = self.histograms[key]
hist['count'] += 1
hist['sum'] += value
hist['min'] = min(hist['min'], value)
hist['max'] = max(hist['max'], value)
hist['values'].append(value)
# 保持最近1000个值
if len(hist['values']) > 1000:
hist['values'] = hist['values'][-1000:]
self.metrics.append(Metric(
name=name,
value=value,
timestamp=time.time(),
tags=tags or {},
unit="histogram"
))
def get_metrics(self, since: float = None) -> List[Metric]:
"""获取指标"""
if since is None:
return self.metrics.copy()
return [m for m in self.metrics if m.timestamp >= since]
def clear_metrics(self):
"""清除指标"""
self.metrics.clear()
# 系统监控器
class SystemMonitor:
def __init__(self, metrics_collector: MetricsCollector):
self.metrics = metrics_collector
self.monitoring_task = None
self.is_running = False
async def start(self):
"""开始监控"""
self.is_running = True
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
async def stop(self):
"""停止监控"""
self.is_running = False
if self.monitoring_task:
self.monitoring_task.cancel()
async def _monitoring_loop(self):
"""监控循环"""
while self.is_running:
try:
await self._collect_system_metrics()
await asyncio.sleep(10) # 每10秒收集一次
except asyncio.CancelledError:
break
except Exception as e:
print(f"Monitoring error: {e}")
await asyncio.sleep(5)
async def _collect_system_metrics(self):
"""收集系统指标"""
import psutil
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics.gauge('system.cpu.usage', cpu_percent, {'unit': 'percent'})
# 内存使用率
memory = psutil.virtual_memory()
self.metrics.gauge('system.memory.usage', memory.percent, {'unit': 'percent'})
self.metrics.gauge('system.memory.available', memory.available, {'unit': 'bytes'})
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.metrics.gauge('system.disk.usage', disk_percent, {'unit': 'percent'})
# 网络IO
network = psutil.net_io_counters()
self.metrics.counter('system.network.bytes_sent', network.bytes_sent)
self.metrics.counter('system.network.bytes_recv', network.bytes_recv)
# 分布式追踪
class DistributedTracer:
def __init__(self):
self.active_traces = {}
self.completed_traces = []
def start_span(
self,
operation_name: str,
parent_span_id: str = None,
tags: Dict[str, Any] = None
) -> 'Span':
"""开始一个新的span"""
span = Span(
operation_name=operation_name,
parent_span_id=parent_span_id,
tags=tags or {}
)
self.active_traces[span.span_id] = span
return span
def finish_span(self, span: 'Span'):
"""完成span"""
span.finish()
if span.span_id in self.active_traces:
del self.active_traces[span.span_id]
self.completed_traces.append(span)
# 保持最近1000个追踪
if len(self.completed_traces) > 1000:
self.completed_traces = self.completed_traces[-1000:]
def get_trace(self, trace_id: str) -> List['Span']:
"""获取完整的追踪"""
return [
span for span in self.completed_traces
if span.trace_id == trace_id
]
@dataclass
class Span:
operation_name: str
parent_span_id: str = None
tags: Dict[str, Any] = None
span_id: str = None
trace_id: str = None
start_time: float = None
end_time: float = None
duration: float = None
def __post_init__(self):
import uuid
self.span_id = str(uuid.uuid4())
self.trace_id = self.parent_span_id or str(uuid.uuid4())
self.start_time = time.time()
self.tags = self.tags or {}
def finish(self):
"""完成span"""
self.end_time = time.time()
self.duration = self.end_time - self.start_time
def add_tag(self, key: str, value: Any):
"""添加标签"""
self.tags[key] = value
def log(self, message: str):
"""添加日志"""
if 'logs' not in self.tags:
self.tags['logs'] = []
self.tags['logs'].append({
'timestamp': time.time(),
'message': message
})
部署自动化
1. 容器化部署
# Dockerfile示例配置
DOCKERFILE_TEMPLATE = """
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:8080/health || exit 1
# 启动命令
CMD ["python", "app.py"]
"""
# Docker Compose配置
DOCKER_COMPOSE_TEMPLATE = """
version: '3.8'
services:
api-gateway:
build: ./gateway
ports:
- "8080:8080"
environment:
- SERVICE_NAME=api-gateway
- REGISTRY_URL=http://service-registry:8500
depends_on:
- service-registry
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
claude-service:
build: ./claude-service
deploy:
replicas: 3
environment:
- SERVICE_NAME=claude-service
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REGISTRY_URL=http://service-registry:8500
depends_on:
- service-registry
- redis
auth-service:
build: ./auth-service
deploy:
replicas: 2
environment:
- SERVICE_NAME=auth-service
- REGISTRY_URL=http://service-registry:8500
depends_on:
- service-registry
- postgres
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
postgres:
image: postgres:13
environment:
- POSTGRES_DB=claude_app
- POSTGRES_USER=app_user
- POSTGRES_PASSWORD=app_password
volumes:
- postgres_data:/var/lib/postgresql/data
service-registry:
image: consul:latest
ports:
- "8500:8500"
command: consul agent -dev -client=0.0.0.0
volumes:
redis_data:
postgres_data:
"""
# Kubernetes部署配置
K8S_DEPLOYMENT_TEMPLATE = """
apiVersion: apps/v1
kind: Deployment
metadata:
name: claude-service
labels:
app: claude-service
spec:
replicas: 3
selector:
matchLabels:
app: claude-service
template:
metadata:
labels:
app: claude-service
spec:
containers:
- name: claude-service
image: claude-app/claude-service:latest
ports:
- containerPort: 8080
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: claude-secrets
key: api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: claude-service
spec:
selector:
app: claude-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: claude-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: claude-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
"""
class DeploymentManager:
def __init__(self):
self.environments = {
'development': {
'replicas': 1,
'resources': {'memory': '256Mi', 'cpu': '100m'}
},
'staging': {
'replicas': 2,
'resources': {'memory': '512Mi', 'cpu': '250m'}
},
'production': {
'replicas': 3,
'resources': {'memory': '1Gi', 'cpu': '500m'}
}
}
def generate_k8s_manifests(
self,
service_name: str,
environment: str,
image_tag: str
) -> Dict[str, str]:
"""生成Kubernetes部署清单"""
env_config = self.environments.get(environment, self.environments['production'])
manifests = {
'deployment': self._generate_deployment_manifest(
service_name, environment, image_tag, env_config
),
'service': self._generate_service_manifest(service_name),
'hpa': self._generate_hpa_manifest(service_name, env_config)
}
return manifests
def _generate_deployment_manifest(
self,
service_name: str,
environment: str,
image_tag: str,
config: Dict
) -> str:
"""生成Deployment清单"""
return f"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: {service_name}
namespace: {environment}
labels:
app: {service_name}
environment: {environment}
spec:
replicas: {config['replicas']}
selector:
matchLabels:
app: {service_name}
template:
metadata:
labels:
app: {service_name}
environment: {environment}
spec:
containers:
- name: {service_name}
image: claude-app/{service_name}:{image_tag}
ports:
- containerPort: 8080
resources:
requests:
memory: {config['resources']['memory']}
cpu: {config['resources']['cpu']}
limits:
memory: {config['resources']['memory']}
cpu: {config['resources']['cpu']}
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
"""
def _generate_service_manifest(self, service_name: str) -> str:
"""生成Service清单"""
return f"""
apiVersion: v1
kind: Service
metadata:
name: {service_name}
spec:
selector:
app: {service_name}
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
"""
def _generate_hpa_manifest(self, service_name: str, config: Dict) -> str:
"""生成HPA清单"""
max_replicas = config['replicas'] * 3
return f"""
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {service_name}-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {service_name}
minReplicas: {config['replicas']}
maxReplicas: {max_replicas}
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
"""
这个分布式部署架构提供了完整的微服务部署方案,包括服务发现、负载均衡、容错机制、监控系统和自动化部署等关键组件。