流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(中)

发布于:2025-08-16 ⋅ 阅读:(22) ⋅ 点赞:(0)

第四章:核心模块Python实现详解

在这里插入图片描述

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
# fastapi_kafka_ingestor.py
import asyncio
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from confluent_kafka import Producer, KafkaException
import json
import logging
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Kafka配置 (应从环境变量或配置中心读取)
KAFKA_BROKERS = "kafka1:9092,kafka2:9092"
KAFKA_TOPIC = "raw_events"

# Pydantic模型用于请求验证
class EventModel(BaseModel):
    event_id: str = Field(..., description="Unique event identifier")
    event_type: str = Field(..., description="Type of the event")
    source: str = Field(..., description="Source system of the event")
    timestamp: Optional[str] = Field(None, description="Event timestamp (ISO 8601)")
    payload: Dict[str, Any] = Field(..., description="Event data payload")

# Kafka Producer配置
conf = {
   
   
    'bootstrap.servers': KAFKA_BROKERS,
    'client.id': 'fastapi_ingestor',
    # 可选: 启用压缩
    # 'compression.codec': 'snappy',
    # 可选: 启用批处理
    # 'batch.num.messages': 100,
    # 'linger.ms': 10,
    # 可选: 启用ACKs保证
    # 'acks': 'all',
    # 可选: 重试
    # 'retries': 3,
    # 'retry.backoff.ms': 100,
}
producer = Producer(conf)

# FastAPI应用
app = FastAPI(
    title="Real-Time Event Ingestion API",
    description="API for ingesting events into Kafka",
    version="1.0.0"
)

# 异步发送消息到Kafka
async def produce_event(topic: str, key: str, value: dict):
    loop = asyncio.get_event_loop()
    try:
        # 在单独的线程中运行同步的producer.produce
        await loop.run_in_executor(
            None, 
            lambda: producer.produce(topic, key=key, value=json.dumps(value).encode('utf-8'))
        )
        producer.poll(0)  # 触发回调
        logger.info(f"Event with key '{
     
     key}' sent to topic '{
     
     topic}'")
    except BufferError:
        logger.error(f"Kafka producer buffer full for key '{
     
     key}'")
        raise HTTPException(status_code=503, detail="Service temporarily unavailable (Kafka buffer full)")
    except KafkaException as e:
        logger.error(f"Kafka error for key '{
     
     key}': {
     
     e}")
        raise HTTPException(status_code=500, detail=f"Internal server error (Kafka: {
     
     e})")

# 交付报告回调 (可选,用于确认消息是否成功发送)
def delivery_report(err, msg):
    if err is not None:
        logger.error(f'Message delivery failed: {
     
     err}')
    else:
        logger.info(f'Message delivered to {
     
     msg.topic()} [{
     
     msg.partition()}]')

# 设置交付报告回调
producer = Producer({
   
   **conf, 'on_delivery': delivery_report})

# API端点:接收单个事件
@app.post("/events/", response_model=Dict[str, str], status_code=202)
async def ingest_event(event: EventModel, background_tasks: BackgroundTasks):
    """
    Ingest a single event into the Kafka topic.
    """
    try:
        # 如果没有提供时间戳,使用当前时间
        if not event.timestamp:
            from datetime import datetime
            event.timestamp = datetime.utcnow().isoformat() + "Z"
        
        # 构造Kafka消息
        kafka_message = event.dict()
        kafka_key = event.event_id  # 使用event_id作为Kafka key保证顺序性
        
        # 异步发送消息 (使用BackgroundTasks避免阻塞响应)
        background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)
        
        return {
   
   "status": "accepted", "event_id": event.event_id}
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Unexpected error ingesting event {
     
     event.event_id}: {
     
     e}")
        raise HTTPException(status_code=500, detail="Internal server error")

# API端点:批量接收事件
@app.post("/events/batch/", response_model=Dict[str, Any], status_code=207)
async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks):
    """
    Ingest a batch of events into the Kafka topic.
    Returns a multi-status response indicating success/failure per event.
    """
    results = []
    success_count = 0
    failure_count = 0
    
    for event in events:
        try:
            if not event.timestamp:
                from datetime import datetime
                event.timestamp = datetime.utcnow().isoformat() + "Z"
            
            kafka_message = event.dict()
            kafka_key = event.event_id
            
            background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)
            results.append({
   
   "event_id": event.event_id, "status": "accepted"}

网站公告

今日签到

点亮在社区的每一天
去签到