第四章:核心模块Python实现详解

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
import asyncio
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from confluent_kafka import Producer, KafkaException
import json
import logging
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
KAFKA_BROKERS = "kafka1:9092,kafka2:9092"
KAFKA_TOPIC = "raw_events"
class EventModel(BaseModel):
event_id: str = Field(..., description="Unique event identifier")
event_type: str = Field(..., description="Type of the event")
source: str = Field(..., description="Source system of the event")
timestamp: Optional[str] = Field(None, description="Event timestamp (ISO 8601)")
payload: Dict[str, Any] = Field(..., description="Event data payload")
conf = {
'bootstrap.servers': KAFKA_BROKERS,
'client.id': 'fastapi_ingestor',
}
producer = Producer(conf)
app = FastAPI(
title="Real-Time Event Ingestion API",
description="API for ingesting events into Kafka",
version="1.0.0"
)
async def produce_event(topic: str, key: str, value: dict):
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(
None,
lambda: producer.produce(topic, key=key, value=json.dumps(value).encode('utf-8'))
)
producer.poll(0)
logger.info(f"Event with key '{
key}' sent to topic '{
topic}'")
except BufferError:
logger.error(f"Kafka producer buffer full for key '{
key}'")
raise HTTPException(status_code=503, detail="Service temporarily unavailable (Kafka buffer full)")
except KafkaException as e:
logger.error(f"Kafka error for key '{
key}': {
e}")
raise HTTPException(status_code=500, detail=f"Internal server error (Kafka: {
e})")
def delivery_report(err, msg):
if err is not None:
logger.error(f'Message delivery failed: {
err}')
else:
logger.info(f'Message delivered to {
msg.topic()} [{
msg.partition()}]')
producer = Producer({
**conf, 'on_delivery': delivery_report})
@app.post("/events/", response_model=Dict[str, str], status_code=202)
async def ingest_event(event: EventModel, background_tasks: BackgroundTasks):
"""
Ingest a single event into the Kafka topic.
"""
try:
if not event.timestamp:
from datetime import datetime
event.timestamp = datetime.utcnow().isoformat() + "Z"
kafka_message = event.dict()
kafka_key = event.event_id
background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)
return {
"status": "accepted", "event_id": event.event_id}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error ingesting event {
event.event_id}: {
e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/events/batch/", response_model=Dict[str, Any], status_code=207)
async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks):
"""
Ingest a batch of events into the Kafka topic.
Returns a multi-status response indicating success/failure per event.
"""
results = []
success_count = 0
failure_count = 0
for event in events:
try:
if not event.timestamp:
from datetime import datetime
event.timestamp = datetime.utcnow().isoformat() + "Z"
kafka_message = event.dict()
kafka_key = event.event_id
background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)
results.append({
"event_id": event.event_id, "status": "accepted"}