Python实例题:基于 Apache Kafka 的实时数据流处理平台

发布于:2025-06-21 ⋅ 阅读:(18) ⋅ 点赞:(0)

目录

Python实例题

题目

问题描述

解题思路

关键代码框架

难点分析

扩展方向

Python实例题

题目

基于 Apache Kafka 的实时数据流处理平台

问题描述

开发一个基于 Apache Kafka 的实时数据流处理平台,包含以下功能:

  • 数据生产者:从多个数据源收集数据
  • Kafka 集群:分布式消息队列存储数据流
  • 流处理引擎:实时处理和转换数据流
  • 数据消费者:将处理后的数据写入目标系统
  • 监控与管理:监控 Kafka 集群和数据流处理状态

解题思路

  • 搭建 Kafka 集群实现高可用消息队列
  • 开发数据生产者从不同数据源收集数据
  • 使用 Kafka Streams 或 Apache Flink 实现流处理
  • 设计数据消费者将处理结果写入目标系统
  • 集成监控工具监控集群和流处理状态

关键代码框架

# 数据生产者示例 - 从API获取数据并发送到Kafka
import requests
import json
from kafka import KafkaProducer
import time
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'api-data-stream'

# 创建Kafka生产者
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: str(k).encode('utf-8')
)

# API配置
API_URL = 'https://api.example.com/data'
API_KEY = 'your_api_key'

def fetch_data_from_api():
    """从API获取数据"""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    
    try:
        response = requests.get(API_URL, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        logger.error(f"API请求失败: {e}")
        return None

def send_to_kafka(data):
    """将数据发送到Kafka"""
    if not data:
        return
    
    for item in data:
        # 使用数据中的唯一ID作为键
        key = item.get('id', None)
        
        try:
            # 发送消息到Kafka
            future = producer.send(KAFKA_TOPIC, value=item, key=key)
            
            # 等待确认(可选)
            record_metadata = future.get(timeout=10)
            logger.info(f"消息发送成功 - 主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")
        except Exception as e:
            logger.error(f"消息发送失败: {e}")

def run_producer():
    """运行生产者循环"""
    logger.info("启动数据生产者...")
    
    try:
        while True:
            # 从API获取数据
            data = fetch_data_from_api()
            
            # 发送数据到Kafka
            send_to_kafka(data)
            
            # 等待一段时间再获取下一批数据
            time.sleep(60)  # 每分钟获取一次数据
            
    except KeyboardInterrupt:
        logger.info("生产者被用户中断")
    finally:
        # 关闭生产者连接
        producer.close()
        logger.info("生产者已关闭")

if __name__ == "__main__":
    run_producer()
# 流处理示例 - 使用Kafka Streams处理实时数据
from kafka import KafkaConsumer, KafkaProducer
from kafka.streams import KafkaStreams, Processor, Stream
import json
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
INPUT_TOPIC = 'api-data-stream'
OUTPUT_TOPIC = 'processed-data-stream'

# 定义处理器
class DataProcessor(Processor):
    def process(self, key, value):
        try:
            # 解析JSON数据
            data = json.loads(value)
            
            # 数据转换示例 - 添加处理时间戳
            data['processed_at'] = str(int(time.time()))
            
            # 数据过滤示例 - 只处理特定类型的数据
            if data.get('type') == 'important':
                # 将处理后的数据发送到输出主题
                self.context.forward(key, json.dumps(data).encode('utf-8'))
                
        except json.JSONDecodeError as e:
            logger.error(f"JSON解析错误: {e}")
        except Exception as e:
            logger.error(f"处理数据时出错: {e}")

# 定义流处理拓扑
def create_stream():
    stream_builder = Stream()
    
    # 从输入主题读取数据
    stream = stream_builder.stream(INPUT_TOPIC)
    
    # 应用处理器
    stream.process(DataProcessor)
    
    # 将结果写入输出主题
    stream.to(OUTPUT_TOPIC)
    
    return stream_builder

# 运行流处理应用
def run_stream_processor():
    logger.info("启动流处理应用...")
    
    # 创建Kafka配置
    config = {
        'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
        'application.id': 'data-processing-app',
        'group.id': 'data-processing-group',
        'auto.offset.reset': 'earliest'
    }
    
    # 创建并启动流处理应用
    stream_builder = create_stream()
    kafka_streams = KafkaStreams(stream_builder, config)
    
    try:
        kafka_streams.start()
        logger.info("流处理应用已启动")
        
        # 保持应用运行
        while True:
            time.sleep(1)
            
    except KeyboardInterrupt:
        logger.info("流处理应用被用户中断")
    finally:
        # 关闭流处理应用
        kafka_streams.close()
        logger.info("流处理应用已关闭")

if __name__ == "__main__":
    run_stream_processor()
# 数据消费者示例 - 将处理后的数据写入数据库
import json
from kafka import KafkaConsumer
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import datetime
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'processed-data-stream'

# 配置数据库
DB_URL = 'postgresql://user:password@localhost:5432/stream_data'
engine = create_engine(DB_URL)
Base = declarative_base()
Session = sessionmaker(bind=engine)

# 定义数据模型
class ProcessedData(Base):
    __tablename__ = 'processed_data'
    
    id = Column(Integer, primary_key=True)
    data_id = Column(String(50), index=True)
    type = Column(String(50))
    value = Column(String)
    processed_at = Column(DateTime)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)

# 创建表(如果不存在)
Base.metadata.create_all(engine)

def consume_and_store():
    """消费Kafka消息并存储到数据库"""
    logger.info("启动数据消费者...")
    
    # 创建Kafka消费者
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset='earliest',
        group_id='database-writer-group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    try:
        # 消费消息
        for message in consumer:
            try:
                # 获取消息数据
                data = message.value
                
                # 创建数据库会话
                session = Session()
                
                try:
                    # 创建数据库记录
                    db_record = ProcessedData(
                        data_id=data.get('id'),
                        type=data.get('type'),
                        value=json.dumps(data),
                        processed_at=datetime.datetime.fromtimestamp(int(data.get('processed_at', 0)))
                    )
                    
                    # 添加并提交
                    session.add(db_record)
                    session.commit()
                    
                    logger.info(f"成功存储数据到数据库 - ID: {data.get('id')}")
                except Exception as e:
                    logger.error(f"存储数据到数据库失败: {e}")
                    session.rollback()
                finally:
                    session.close()
                    
            except json.JSONDecodeError as e:
                logger.error(f"JSON解析错误: {e}")
            except Exception as e:
                logger.error(f"处理消息时出错: {e}")
                
    except KeyboardInterrupt:
        logger.info("消费者被用户中断")
    finally:
        # 关闭消费者连接
        consumer.close()
        logger.info("消费者已关闭")

if __name__ == "__main__":
    consume_and_store()

难点分析

  • Kafka 集群配置:确保高可用性和数据持久性
  • 消息序列化与反序列化:处理不同格式的数据
  • 流处理语义:实现精确一次处理语义
  • 数据一致性:跨多个服务保证数据一致性
  • 监控与调优:监控 Kafka 集群性能并进行调优

扩展方向

  • 添加更多数据源和目标系统支持
  • 实现更复杂的流处理逻辑
  • 添加数据分区和负载均衡策略
  • 集成分布式追踪系统
  • 实现自动扩容和故障恢复机制

网站公告

今日签到

点亮在社区的每一天
去签到